Merge branch 'plugin-refactor'

This commit is contained in:
2024-08-31 06:46:22 +10:00
11 changed files with 8 additions and 1864 deletions

6
.gitmodules vendored Normal file
View File

@@ -0,0 +1,6 @@
[submodule "src/modules/voicefix"]
path = src/modules/voicefix
url = git@github.com:Intery/StudyLion-voicefix.git
[submodule "src/modules/streamalerts"]
path = src/modules/streamalerts
url = git@github.com:Intery/StudyLion-streamalerts.git

View File

@@ -1,8 +0,0 @@
import logging
from meta import LionBot
logger = logging.getLogger(__name__)
async def setup(bot: LionBot):
from .cog import AlertCog
await bot.add_cog(AlertCog(bot))

View File

@@ -1,611 +0,0 @@
import asyncio
from typing import Optional
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from twitchAPI.twitch import Twitch
from twitchAPI.helper import first
from meta import LionBot, LionCog, LionContext
from meta.errors import UserInputError
from meta.logger import log_wrap
from utils.lib import utc_now
from data.conditions import NULL
from . import logger
from .data import AlertsData
from .settings import AlertConfig, AlertSettings
from .editor import AlertEditorUI
class AlertCog(LionCog):
POLL_PERIOD = 60
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(AlertsData())
self.twitch = None
self.alert_settings = AlertSettings()
self.poll_task = None
self.event_tasks = set()
# Cache of currently live streams, maps streamerid -> stream
self.live_streams = {}
# Cache of streamers we are watching state changes for
# Map of streamerid -> streamer
self.watching = {}
async def cog_load(self):
await self.data.init()
await self.twitch_login()
await self.load_subs()
self.poll_task = asyncio.create_task(self.poll_live())
async def twitch_login(self):
# TODO: Probably abstract this out to core or a dedicated core cog
# Also handle refresh tokens
if self.twitch is not None:
await self.twitch.close()
self.twitch = None
self.twitch = await Twitch(
self.bot.config.twitch['app_id'].strip(),
self.bot.config.twitch['app_secret'].strip()
)
async def load_subs(self):
# Load active subscriptions
active_subs = await self.data.AlertChannel.fetch_where()
to_watch = {sub.streamerid for sub in active_subs}
live_streams = await self.data.Stream.fetch_where(
self.data.Stream.end_at != NULL
)
to_watch.union(stream.streamerid for stream in live_streams)
# Load associated streamers
watching = {}
if to_watch:
streamers = await self.data.Streamer.fetch_where(
userid=list(to_watch)
)
for streamer in streamers:
watching[streamer.userid] = streamer
self.watching = watching
self.live_streams = {stream.streamerid: stream for stream in live_streams}
logger.info(
f"Watching {len(watching)} streamers for state changes. "
f"Loaded {len(live_streams)} (previously) live streams into cache."
)
async def poll_live(self):
# Every PERIOD seconds,
# request get_streams for the streamers we are currently watching.
# Check if they are in the live_stream cache,
# and update cache and data and fire-and-forget start/stop events as required.
# TODO: Logging
# TODO: Error handling so the poll loop doesn't die from temporary errors
# And when it does die it gets logged properly.
if not self.twitch:
raise ValueError("Attempting to start alert poll-loop before twitch set.")
block_i = 0
self.polling = True
while self.polling:
await asyncio.sleep(self.POLL_PERIOD)
to_request = list(self.watching.keys())
if not to_request:
continue
# Each loop we request the 'next' slice of 100 userids
blocks = [to_request[i:i+100] for i in range(0, len(to_request), 100)]
block_i += 1
block_i %= len(blocks)
block = blocks[block_i]
streaming = {}
async for stream in self.twitch.get_streams(user_id=block, first=100):
# Note we set page size to 100
# So we should never get repeat or missed streams
# Since we can request a max of 100 userids anyway.
streaming[int(stream.user_id)] = stream
started = set(streaming.keys()).difference(self.live_streams.keys())
ended = set(self.live_streams.keys()).difference(streaming.keys())
for streamerid in started:
stream = streaming[streamerid]
stream_data = await self.data.Stream.create(
streamerid=int(stream.user_id),
start_at=stream.started_at,
twitch_stream_id=int(stream.id),
game_name=stream.game_name,
title=stream.title,
)
self.live_streams[streamerid] = stream_data
task = asyncio.create_task(self.on_stream_start(stream_data))
self.event_tasks.add(task)
task.add_done_callback(self.event_tasks.discard)
for streamerid in ended:
stream_data = self.live_streams.pop(streamerid)
await stream_data.update(end_at=utc_now())
task = asyncio.create_task(self.on_stream_end(stream_data))
self.event_tasks.add(task)
task.add_done_callback(self.event_tasks.discard)
async def on_stream_start(self, stream_data):
# Get channel subscriptions listening for this streamer
uid = int(stream_data.streamerid)
logger.info(f"Streamer <uid:{uid}> started streaming! {stream_data=}")
subbed = await self.data.AlertChannel.fetch_where(streamerid=uid)
# Fulfill those alerts
for sub in subbed:
try:
# If the sub is paused, don't create the alert
await self.sub_alert(sub, stream_data)
except discord.HTTPException:
# TODO: Needs to be handled more gracefully at user level
# Retry logic?
logger.warning(
f"Could not complete subscription {sub=} for {stream_data=}", exc_info=True
)
except Exception:
logger.exception(
f"Unexpected exception completing {sub=} for {stream_data=}"
)
raise
async def subscription_error(self, subscription, stream_data, err_msg):
"""
Handle a subscription fulfill failure.
Stores the error message for user display,
and deletes the subscription after some number of errors.
# TODO
"""
logger.warning(
f"Subscription error {subscription=} {stream_data=} {err_msg=}"
)
async def sub_alert(self, subscription, stream_data):
# Base alert behaviour is just to send a message
# and create an alert row
channel = self.bot.get_channel(subscription.channelid)
if channel is None or not isinstance(channel, discord.abc.Messageable):
# Subscription channel is gone!
# Or the Discord channel cache died
await self.subscription_error(
subscription, stream_data,
"Subscription channel no longer exists."
)
return
permissions = channel.permissions_for(channel.guild.me)
if not (permissions.send_messages and permissions.embed_links):
await self.subscription_error(
subscription, stream_data,
"Insufficient permissions to post alert message."
)
return
# Build message
streamer = await self.data.Streamer.fetch(int(stream_data.streamerid))
if not streamer:
# Streamer was deleted while handling the alert
# Just quietly ignore
# Don't error out because the stream data row won't exist anymore
logger.warning(
f"Cancelling alert for subscription {subscription.subscriptionid}"
" because the streamer no longer exists."
)
return
alert_config = AlertConfig(subscription.subscriptionid, subscription)
paused = alert_config.get(self.alert_settings.AlertPaused.setting_id)
if paused.value:
logger.info(f"Skipping alert for subscription {subscription=} because it is paused.")
return
live_message = alert_config.get(self.alert_settings.AlertMessage.setting_id)
formatter = await live_message.generate_formatter(self.bot, stream_data, streamer)
formatted = await formatter(live_message.value)
args = live_message.value_to_args(subscription.subscriptionid, formatted)
try:
message = await channel.send(**args.send_args)
except discord.HTTPException as e:
logger.warning(
f"Message send failure while sending streamalert {subscription.subscriptionid}",
exc_info=True
)
await self.subscription_error(
subscription, stream_data,
"Failed to post live alert."
)
return
# Store sent alert
alert = await self.data.StreamAlert.create(
streamid=int(stream_data.streamid),
subscriptionid=subscription.subscriptionid,
sent_at=utc_now(),
messageid=message.id
)
logger.debug(
f"Fulfilled subscription {subscription.subscriptionid} with alert {alert.alertid}"
)
async def on_stream_end(self, stream_data):
# Get channel subscriptions listening for this streamer
uid = int(stream_data.streamerid)
logger.info(f"Streamer <uid:{uid}> stopped streaming! {stream_data=}")
subbed = await self.data.AlertChannel.fetch_where(streamerid=uid)
# Resolve subscriptions
for sub in subbed:
try:
await self.sub_resolve(sub, stream_data)
except discord.HTTPException:
# TODO: Needs to be handled more gracefully at user level
# Retry logic?
logger.warning(
f"Could not resolve subscription {sub=} for {stream_data=}", exc_info=True
)
except Exception:
logger.exception(
f"Unexpected exception resolving {sub=} for {stream_data=}"
)
raise
async def sub_resolve(self, subscription, stream_data):
# Check if there is a current active alert to resolve
alerts = await self.data.StreamAlert.fetch_where(
streamid=int(stream_data.streamid),
subscriptionid=int(subscription.subscriptionid),
)
if not alerts:
logger.info(
f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} "
"but no active alerts were found."
)
return
alert = alerts[0]
if alert.resolved_at is not None:
# Alert was already resolved
# This is okay, Twitch might have just sent the stream ending twice
logger.info(
f"Resolution requested for subscription {subscription.subscriptionid} with stream {stream_data.streamid} "
"but alert was already resolved."
)
return
# Check if message is to be deleted or edited (or nothing)
alert_config = AlertConfig(subscription.subscriptionid, subscription)
del_setting = alert_config.get(self.alert_settings.AlertEndDelete.setting_id)
edit_setting = alert_config.get(self.alert_settings.AlertEndMessage.setting_id)
if (delmsg := del_setting.value) or (edit_setting.value):
# Find the message
message = None
channel = self.bot.get_channel(subscription.channelid)
if channel:
try:
message = await channel.fetch_message(alert.messageid)
except discord.HTTPException:
# Message was probably deleted already
# Or permissions were changed
# Or Discord connection broke
pass
else:
# Channel went after posting the alert
# Or Discord cache sucks
# Nothing we can do, just mark it handled
pass
if message:
if delmsg:
# Delete the message
try:
await message.delete()
except discord.HTTPException:
logger.warning(
f"Discord exception while del-resolve live alert {alert=}",
exc_info=True
)
else:
# Edit message with custom arguments
streamer = await self.data.Streamer.fetch(int(stream_data.streamerid))
formatter = await edit_setting.generate_formatter(self.bot, stream_data, streamer)
formatted = await formatter(edit_setting.value)
args = edit_setting.value_to_args(subscription.subscriptionid, formatted)
try:
await message.edit(**args.edit_args)
except discord.HTTPException:
logger.warning(
f"Discord exception while edit-resolve live alert {alert=}",
exc_info=True
)
else:
# Explicitly don't need to do anything to the alert
pass
# Save alert as resolved
await alert.update(resolved_at=utc_now())
async def cog_unload(self):
if self.poll_task is not None and not self.poll_task.cancelled():
self.poll_task.cancel()
if self.twitch is not None:
await self.twitch.close()
self.twitch = None
# ----- Commands -----
@cmds.hybrid_group(
name='streamalert',
description=(
"Create and configure stream live-alerts."
)
)
@cmds.guild_only()
@appcmds.default_permissions(manage_channels=True)
async def streamalert_group(self, ctx: LionContext):
# Placeholder group, method not used
raise NotImplementedError
@streamalert_group.command(
name='create',
description=(
"Subscribe a Discord channel to notifications when a Twitch stream goes live."
)
)
@appcmds.describe(
streamer="Name of the twitch channel to watch.",
channel="Which Discord channel to send live alerts in.",
message="Custom message to send when the channel goes live (may be edited later)."
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_create_cmd(self, ctx: LionContext,
streamer: str,
channel: discord.TextChannel,
message: Optional[str]):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
# Wards
if not channel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"to add a stream alert to a channel."
)
return
# Look up the specified streamer
tw_user = await first(self.twitch.get_users(logins=[streamer]))
if not tw_user:
await ctx.error_reply(
f"Sorry, could not find `{streamer}` on Twitch! "
"Make sure you use the name in their channel url."
)
return
# Create streamer data if it doesn't already exist
streamer_data = await self.data.Streamer.fetch_or_create(
int(tw_user.id),
login_name=tw_user.login,
display_name=tw_user.display_name,
)
# Add subscription to alerts list
sub_data = await self.data.AlertChannel.create(
streamerid=streamer_data.userid,
guildid=channel.guild.id,
channelid=channel.id,
created_by=ctx.author.id,
paused=False
)
# Add to watchlist
self.watching[streamer_data.userid] = streamer_data
# Open AlertEditorUI for the new subscription
await ctx.reply("StreamAlert Created.")
ui = AlertEditorUI(bot=self.bot, sub_data=sub_data, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
async def alert_acmpl(self, interaction: discord.Interaction, partial: str):
if not interaction.guild:
raise ValueError("Cannot acmpl alert in guildless interaction.")
# Get all alerts in the server
alerts = await self.data.AlertChannel.fetch_where(guildid=interaction.guild_id)
if not alerts:
# No alerts available
options = [
appcmds.Choice(
name="No stream alerts are set up in this server!",
value=partial
)
]
else:
options = []
for alert in alerts:
streamer = await self.data.Streamer.fetch(alert.streamerid)
if streamer is None:
# Should be impossible by foreign key condition
# Might be a stale cache
continue
channel = interaction.guild.get_channel(alert.channelid)
display = f"{streamer.display_name} in #{channel.name if channel else 'unknown'}"
if partial.lower() in display.lower():
# Matching option
options.append(appcmds.Choice(name=display, value=str(alert.subscriptionid)))
if not options:
options.append(
appcmds.Choice(
name=f"No stream alerts matching {partial}"[:25],
value=partial
)
)
return options
async def resolve_alert(self, interaction: discord.Interaction, alert_str: str):
if not interaction.guild:
raise ValueError("Resolving alert outside of a guild.")
# Expect alert_str to be the integer subscriptionid
if not alert_str.isdigit():
raise UserInputError(
f"No stream alerts in this server matching `{alert_str}`!"
)
alert = await self.data.AlertChannel.fetch(int(alert_str))
if not alert or not alert.guildid == interaction.guild_id:
raise UserInputError(
"Could not find the selected alert! Please try again."
)
return alert
@streamalert_group.command(
name='edit',
description=(
"Update settings for an existing Twitch stream alert."
)
)
@appcmds.describe(
alert="Which alert do you want to edit?",
# TODO: Other settings here
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_edit_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
# If edit options have been given, save edits and retouch cache if needed
# If not, open AlertEditorUI
ui = AlertEditorUI(bot=self.bot, sub_data=sub_data, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
@streamalert_edit_cmd.autocomplete('alert')
async def streamalert_edit_cmd_alert_acmpl(self, interaction, partial):
return await self.alert_acmpl(interaction, partial)
@streamalert_group.command(
name='pause',
description=(
"Pause a streamalert."
)
)
@appcmds.describe(
alert="Which alert do you want to pause?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_pause_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.update(paused=True)
await ctx.reply("This alert is now paused!")
@streamalert_group.command(
name='unpause',
description=(
"Resume a streamalert."
)
)
@appcmds.describe(
alert="Which alert do you want to unpause?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_unpause_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.update(paused=False)
await ctx.reply("This alert has been unpaused!")
@streamalert_group.command(
name='remove',
description=(
"Deactivate a streamalert entirely (see /streamalert pause to temporarily pause it)."
)
)
@appcmds.describe(
alert="Which alert do you want to remove?",
)
@appcmds.default_permissions(manage_channels=True)
async def streamalert_remove_cmd(self, ctx: LionContext, alert: str):
# Type guards
assert ctx.guild is not None, "Guild-only command has no guild ctx."
assert self.twitch is not None, "Twitch command run with no twitch obj."
assert ctx.interaction is not None, "Twitch command needs interaction ctx."
# Look up provided alert
sub_data = await self.resolve_alert(ctx.interaction, alert)
# Check user permissions for editing this alert
channel = ctx.guild.get_channel(sub_data.channelid)
permlevel = channel if channel else ctx.guild
if not permlevel.permissions_for(ctx.author).manage_channels:
await ctx.error_reply(
"Sorry, you need the `MANAGE_CHANNELS` permission "
"in this channel to edit the stream alert."
)
return
await sub_data.delete()
await ctx.reply("This alert has been deleted.")

View File

@@ -1,105 +0,0 @@
from data import Registry, RowModel
from data.columns import Integer, Bool, Timestamp, String
from data.models import WeakCache
from cachetools import TTLCache
class AlertsData(Registry):
class Streamer(RowModel):
"""
Schema
------
CREATE TABLE streamers(
userid BIGINT PRIMARY KEY,
login_name TEXT NOT NULL,
display_name TEXT NOT NULL
);
"""
_tablename_ = 'streamers'
_cache_ = {}
userid = Integer(primary=True)
login_name = String()
display_name = String()
class AlertChannel(RowModel):
"""
Schema
------
CREATE TABLE alert_channels(
subscriptionid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
channelid BIGINT NOT NULL,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
created_by BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
paused BOOLEAN NOT NULL DEFAULT FALSE,
end_delete BOOLEAN NOT NULL DEFAULT FALSE,
live_message TEXT,
end_message TEXT
);
CREATE INDEX alert_channels_guilds ON alert_channels (guildid);
CREATE UNIQUE INDEX alert_channels_channelid_streamerid ON alert_channels (channelid, streamerid);
"""
_tablename_ = 'alert_channels'
_cache_ = {}
subscriptionid = Integer(primary=True)
guildid = Integer()
channelid = Integer()
streamerid = Integer()
display_name = Integer()
created_by = Integer()
created_at = Timestamp()
paused = Bool()
end_delete = Bool()
live_message = String()
end_message = String()
class Stream(RowModel):
"""
Schema
------
CREATE TABLE streams(
streamid SERIAL PRIMARY KEY,
streamerid BIGINT NOT NULL REFERENCES streamers (userid) ON DELETE CASCADE,
start_at TIMESTAMPTZ NOT NULL,
twitch_stream_id BIGINT,
game_name TEXT,
title TEXT,
end_at TIMESTAMPTZ
);
"""
_tablename_ = 'streams'
_cache_ = WeakCache(TTLCache(maxsize=100, ttl=24*60*60))
streamid = Integer(primary=True)
streamerid = Integer()
start_at = Timestamp()
twitch_stream_id = Integer()
game_name = String()
title = String()
end_at = Timestamp()
class StreamAlert(RowModel):
"""
Schema
------
CREATE TABLE stream_alerts(
alertid SERIAL PRIMARY KEY,
streamid INTEGER NOT NULL REFERENCES streams (streamid) ON DELETE CASCADE,
subscriptionid INTEGER NOT NULL REFERENCES alert_channels (subscriptionid) ON DELETE CASCADE,
sent_at TIMESTAMPTZ NOT NULL,
messageid BIGINT NOT NULL,
resolved_at TIMESTAMPTZ
);
"""
_tablename_ = 'stream_alerts'
_cache_ = WeakCache(TTLCache(maxsize=1000, ttl=24*60*60))
alertid = Integer(primary=True)
streamid = Integer()
subscriptionid = Integer()
sent_at = Timestamp()
messageid = Integer()
resolved_at = Timestamp()

View File

@@ -1,369 +0,0 @@
import asyncio
import datetime as dt
from collections import namedtuple
from functools import wraps
from typing import TYPE_CHECKING
import discord
from discord.ui.button import button, Button, ButtonStyle
from discord.ui.select import select, Select, SelectOption, ChannelSelect
from meta import LionBot, conf
from utils.lib import MessageArgs, tabulate, utc_now
from utils.ui import MessageUI
from utils.ui.msgeditor import MsgEditor
from .settings import AlertSettings as Settings
from .settings import AlertConfig as Config
from .data import AlertsData
if TYPE_CHECKING:
from .cog import AlertCog
FakeStream = namedtuple(
'FakeStream',
["streamid", "streamerid", "start_at", "twitch_stream_id", "game_name", "title", "end_at"]
)
class AlertEditorUI(MessageUI):
setting_classes = (
Settings.AlertPaused,
Settings.AlertEndDelete,
Settings.AlertEndMessage,
Settings.AlertMessage,
Settings.AlertChannel,
)
def __init__(self, bot: LionBot, sub_data: AlertsData.AlertChannel, **kwargs):
super().__init__(**kwargs)
self.bot = bot
self.sub_data = sub_data
self.subid = sub_data.subscriptionid
self.cog: 'AlertCog' = bot.get_cog('AlertCog')
self.config = Config(self.subid, sub_data)
# ----- UI API -----
def preview_stream_data(self):
# TODO: Probably makes sense to factor this out to the cog
# Or even generate it in the formatters themselves
data = self.sub_data
return FakeStream(
-1,
data.streamerid,
utc_now() - dt.timedelta(hours=1),
-1,
"Art",
"Testing Go Live Message",
utc_now()
)
def call_and_refresh(self, func):
"""
Generate a wrapper which runs coroutine 'func' and then refreshes the UI.
"""
# TODO: Check whether the UI has finished interaction
@wraps(func)
async def wrapped(*args, **kwargs):
await func(*args, **kwargs)
await self.refresh()
return wrapped
# ----- UI Components -----
# Pause button
@button(label="PAUSE_PLACEHOLDER", style=ButtonStyle.blurple)
async def pause_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertPaused.setting_id)
setting.value = not setting.value
await setting.write()
await self.refresh(thinking=press)
async def pause_button_refresh(self):
button = self.pause_button
if self.config.get(Settings.AlertPaused.setting_id).value:
button.label = "UnPause"
button.style = ButtonStyle.grey
else:
button.label = "Pause"
button.style = ButtonStyle.green
# Delete button
@button(label="Delete Alert", style=ButtonStyle.red)
async def delete_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
await self.sub_data.delete()
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description="Stream alert removed."
)
await press.edit_original_response(embed=embed)
await self.close()
# Close button
@button(emoji=conf.emojis.cancel, style=ButtonStyle.red)
async def close_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=False)
await self.close()
# Edit Alert button
@button(label="Edit Alert", style=ButtonStyle.blurple)
async def edit_alert_button(self, press: discord.Interaction, pressed: Button):
# Spawn MsgEditor for the live alert
await press.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertMessage.setting_id)
stream = self.preview_stream_data()
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
editor = MsgEditor(
self.bot,
setting.value,
callback=self.call_and_refresh(setting.editor_callback),
formatter=await setting.generate_formatter(self.bot, stream, streamer),
callerid=press.user.id
)
self._slaves.append(editor)
await editor.run(press)
# Edit End message
@button(label="Edit Ending Alert", style=ButtonStyle.blurple)
async def edit_end_button(self, press: discord.Interaction, pressed: Button):
# Spawn MsgEditor for the ending alert
await press.response.defer(thinking=True, ephemeral=True)
await self.open_end_editor(press)
async def open_end_editor(self, respond_to: discord.Interaction):
setting = self.config.get(Settings.AlertEndMessage.setting_id)
# Start from current live alert data if not set
if not setting.value:
alert_setting = self.config.get(Settings.AlertMessage.setting_id)
setting.value = alert_setting.value
stream = self.preview_stream_data()
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
editor = MsgEditor(
self.bot,
setting.value,
callback=self.call_and_refresh(setting.editor_callback),
formatter=await setting.generate_formatter(self.bot, stream, streamer),
callerid=respond_to.user.id
)
self._slaves.append(editor)
await editor.run(respond_to)
return editor
# Ending Mode Menu
@select(
cls=Select,
placeholder="Select action to take when the stream ends",
options=[SelectOption(label="DUMMY")],
min_values=0, max_values=1
)
async def ending_mode_menu(self, selection: discord.Interaction, selected: Select):
if not selected.values:
await selection.response.defer()
return
await selection.response.defer(thinking=True, ephemeral=True)
value = selected.values[0]
if value == '0':
# In Do Nothing case,
# Ensure Delete is off and custom edit message is unset
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if setting.value:
setting.value = False
await setting.write()
setting = self.config.get(Settings.AlertEndMessage.setting_id)
if setting.value:
setting.value = None
await setting.write()
await self.refresh(thinking=selection)
elif value == '1':
# In Delete Alert case,
# Set the delete setting to True
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if not setting.value:
setting.value = True
await setting.write()
await self.refresh(thinking=selection)
elif value == '2':
# In Edit Message case,
# Set the delete setting to False,
setting = self.config.get(Settings.AlertEndDelete.setting_id)
if setting.value:
setting.value = False
await setting.write()
# And open the edit message editor
await self.open_end_editor(selection)
await self.refresh()
async def ending_mode_menu_refresh(self):
# Build menu options
options = [
SelectOption(
label="Do Nothing",
description="Don't modify the live alert message.",
value="0",
),
SelectOption(
label="Delete Alert After Stream",
description="Delete the live alert message.",
value="1",
),
SelectOption(
label="Edit Alert After Stream",
description="Edit the live alert message to a custom message. Opens editor.",
value="2",
),
]
# Calculate the correct default
if self.config.get(Settings.AlertEndDelete.setting_id).value:
options[1].default = True
elif self.config.get(Settings.AlertEndMessage.setting_id).value:
options[2].default = True
self.ending_mode_menu.options = options
# Edit channel menu
@select(cls=ChannelSelect,
placeholder="Select Alert Channel",
channel_types=[discord.ChannelType.text, discord.ChannelType.voice],
min_values=0, max_values=1)
async def channel_menu(self, selection: discord.Interaction, selected):
if selected.values:
await selection.response.defer(thinking=True, ephemeral=True)
setting = self.config.get(Settings.AlertChannel.setting_id)
setting.value = selected.values[0]
await setting.write()
await self.refresh(thinking=selection)
else:
await selection.response.defer(thinking=False)
async def channel_menu_refresh(self):
# current = self.config.get(Settings.AlertChannel.setting_id).value
# TODO: Check if discord-typed menus can have defaults yet
# Impl in stable dpy, but not released to pip yet
...
# ----- UI Flow -----
async def make_message(self) -> MessageArgs:
streamer = await self.cog.data.Streamer.fetch(self.sub_data.streamerid)
if streamer is None:
raise ValueError("Streamer row does not exist in AlertEditor")
name = streamer.display_name
# Build relevant setting table
table_map = {}
table_map['Channel'] = self.config.get(Settings.AlertChannel.setting_id).formatted
table_map['Streamer'] = f"https://www.twitch.tv/{streamer.login_name}"
table_map['Paused'] = self.config.get(Settings.AlertPaused.setting_id).formatted
prop_table = '\n'.join(tabulate(*table_map.items()))
embed = discord.Embed(
colour=discord.Colour.dark_green(),
title=f"Stream Alert for {name}",
description=prop_table,
timestamp=utc_now()
)
message_setting = self.config.get(Settings.AlertMessage.setting_id)
message_desc_lines = [
f"An alert message will be posted to {table_map['Channel']}.",
f"Press `{self.edit_alert_button.label}`"
" to preview or edit the alert.",
"The following keys will be substituted in the alert message."
]
keytable = tabulate(*message_setting._subkey_desc.items())
for line in keytable:
message_desc_lines.append(f"> {line}")
embed.add_field(
name=f"When {name} goes live",
value='\n'.join(message_desc_lines),
inline=False
)
# Determine the ending behaviour
del_setting = self.config.get(Settings.AlertEndDelete.setting_id)
end_msg_setting = self.config.get(Settings.AlertEndMessage.setting_id)
if del_setting.value:
# Deleting
end_msg_desc = "The live alert message will be deleted."
...
elif end_msg_setting.value:
# Editing
lines = [
"The live alert message will edited to the configured message.",
f"Press `{self.edit_end_button.label}` to preview or edit the message.",
"The following substitution keys are supported "
"*in addition* to the live alert keys."
]
keytable = tabulate(
*[(k, v) for k, v in end_msg_setting._subkey_desc.items() if k not in message_setting._subkey_desc]
)
for line in keytable:
lines.append(f"> {line}")
end_msg_desc = '\n'.join(lines)
else:
# Doing nothing
end_msg_desc = "The live alert message will not be changed."
embed.add_field(
name=f"When {name} ends their stream",
value=end_msg_desc,
inline=False
)
return MessageArgs(embed=embed)
async def reload(self):
await self.sub_data.refresh()
# Note self.config references the sub_data, and doesn't need reloading.
async def refresh_layout(self):
to_refresh = (
self.pause_button_refresh(),
self.channel_menu_refresh(),
self.ending_mode_menu_refresh(),
)
await asyncio.gather(*to_refresh)
show_end_edit = (
not self.config.get(Settings.AlertEndDelete.setting_id).value
and
self.config.get(Settings.AlertEndMessage.setting_id).value
)
if not show_end_edit:
# Don't show edit end button
buttons = (
self.edit_alert_button,
self.pause_button, self.delete_button, self.close_button
)
else:
buttons = (
self.edit_alert_button, self.edit_end_button,
self.pause_button, self.delete_button, self.close_button
)
self.set_layout(
buttons,
(self.ending_mode_menu,),
(self.channel_menu,),
)

View File

@@ -1,276 +0,0 @@
from typing import Optional, Any
import json
from meta.LionBot import LionBot
from settings import ModelData
from settings.groups import SettingGroup, ModelConfig, SettingDotDict
from settings.setting_types import BoolSetting, ChannelSetting
from core.setting_types import MessageSetting
from babel.translator import LocalBabel
from utils.lib import recurse_map, replace_multiple, tabulate
from .data import AlertsData
babel = LocalBabel('streamalerts')
_p = babel._p
class AlertConfig(ModelConfig):
settings = SettingDotDict()
_model_settings = set()
model = AlertsData.AlertChannel
class AlertSettings(SettingGroup):
@AlertConfig.register_model_setting
class AlertMessage(ModelData, MessageSetting):
setting_id = 'alert_live_message'
_display_name = _p('', 'live_message')
_desc = _p(
'',
'Message sent to the channel when the streamer goes live.'
)
_long_desc = _p(
'',
'Message sent to the attached channel when the Twitch streamer goes live.'
)
_accepts = _p('', 'JSON formatted greeting message data')
_default = json.dumps({'content': "**{display_name}** just went live at {channel_link}"})
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.live_message.name
_subkey_desc = {
'{display_name}': "Twitch channel name (with capitalisation)",
'{login_name}': "Twitch channel login name (as in url)",
'{channel_link}': "Link to the live twitch channel",
'{stream_start}': "Numeric timestamp when stream went live",
'{stream_start_iso}': "ISO timestamp when stream went live (for embed timestamp)",
'{title}': "Title of the stream when it went live",
'{game}': "Game name of the stream when it went live"
}
# TODO: More stuff
@property
def update_message(self) -> str:
return "The go-live notification message has been updated!"
@classmethod
async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs):
"""
Generate a formatter function for this message
from the provided stream and streamer data.
The formatter function accepts and returns a message data dict.
"""
async def formatter(data_dict: Optional[dict[str, Any]]):
if not data_dict:
return None
mapping = {
'{display_name}': streamer.display_name,
'{login_name}': streamer.login_name,
'{channel_link}': f"https://www.twitch.tv/{streamer.login_name}",
'{stream_start}': int(stream.start_at.timestamp()),
'{stream_start_iso}': stream.start_at.isoformat(),
'{title}': stream.title,
'{game}': stream.game_name,
}
return recurse_map(
lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value,
data_dict,
)
return formatter
async def editor_callback(self, editor_data):
self.value = editor_data
await self.write()
def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]:
lines = super()._desc_table(show_value=show_value)
keytable = tabulate(*self._subkey_desc.items(), colon='')
expline = (
"The following placeholders will be substituted with their values."
)
keyfield = (
"Placeholders",
expline + '\n' + '\n'.join(f"> {line}" for line in keytable)
)
lines.append(keyfield)
return lines
@AlertConfig.register_model_setting
class AlertEndMessage(ModelData, MessageSetting):
"""
Custom ending message to edit the live alert to.
If not set, doesn't edit the alert.
"""
setting_id = 'alert_end_message'
_display_name = _p('', 'end_message')
_desc = _p(
'',
'Optional message to edit the live alert with when the stream ends.'
)
_long_desc = _p(
'',
"If set, and `end_delete` is not on, "
"the live alert will be edited with this custom message "
"when the stream ends."
)
_accepts = _p('', 'JSON formatted greeting message data')
_default = None
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.end_message.name
_subkey_desc = {
'{display_name}': "Twitch channel name (with capitalisation)",
'{login_name}': "Twitch channel login name (as in url)",
'{channel_link}': "Link to the live twitch channel",
'{stream_start}': "Numeric timestamp when stream went live",
'{stream_start_iso}': "ISO timestamp when stream went live (for embed timestamp)",
'{stream_end}': "Numeric timestamp when stream ended",
'{stream_end_iso}': "ISO timestamp when stream ended (for embed timestamp)",
'{title}': "Title of the stream when it went live",
'{game}': "Game name of the stream when it went live",
}
@property
def update_message(self) -> str:
if self.value:
return "The stream ending message has been updated."
else:
return "The stream ending message has been unset."
@classmethod
async def generate_formatter(cls, bot: LionBot, stream: AlertsData.Stream, streamer: AlertsData.Streamer, **kwargs):
"""
Generate a formatter function for this message
from the provided stream and streamer data.
The formatter function accepts and returns a message data dict.
"""
# TODO: Fake stream data maker (namedtuple?) for previewing
async def formatter(data_dict: Optional[dict[str, Any]]):
if not data_dict:
return None
mapping = {
'{display_name}': streamer.display_name,
'{login_name}': streamer.login_name,
'{channel_link}': f"https://www.twitch.tv/{streamer.login_name}",
'{stream_start}': int(stream.start_at.timestamp()),
'{stream_end}': int(stream.end_at.timestamp()),
'{stream_start_iso}': stream.start_at.isoformat(),
'{stream_end_iso}': stream.end_at.isoformat(),
'{title}': stream.title,
'{game}': stream.game_name,
}
return recurse_map(
lambda loc, value: replace_multiple(value, mapping) if isinstance(value, str) else value,
data_dict,
)
return formatter
async def editor_callback(self, editor_data):
self.value = editor_data
await self.write()
def _desc_table(self, show_value: Optional[str] = None) -> list[tuple[str, str]]:
lines = super()._desc_table(show_value=show_value)
keytable = tabulate(*self._subkey_desc.items(), colon='')
expline = (
"The following placeholders will be substituted with their values."
)
keyfield = (
"Placeholders",
expline + '\n' + '\n'.join(f"> {line}" for line in keytable)
)
lines.append(keyfield)
return lines
...
@AlertConfig.register_model_setting
class AlertEndDelete(ModelData, BoolSetting):
"""
Whether to delete the live alert after the stream ends.
"""
setting_id = 'alert_end_delete'
_display_name = _p('', 'end_delete')
_desc = _p(
'',
'Whether to delete the live alert after the stream ends.'
)
_long_desc = _p(
'',
"If enabled, the live alert message will be deleted when the stream ends. "
"This overrides the `end_message` setting."
)
_default = False
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.end_delete.name
@property
def update_message(self) -> str:
if self.value:
return "The live alert will be deleted at the end of the stream."
else:
return "The live alert will not be deleted when the stream ends."
@AlertConfig.register_model_setting
class AlertPaused(ModelData, BoolSetting):
"""
Whether this live alert is currently paused.
"""
setting_id = 'alert_paused'
_display_name = _p('', 'paused')
_desc = _p(
'',
"Whether the alert is currently paused."
)
_long_desc = _p(
'',
"Paused alerts will not trigger live notifications, "
"although the streams will still be tracked internally."
)
_default = False
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.paused.name
@property
def update_message(self):
if self.value:
return "This alert is now paused"
else:
return "This alert has been unpaused"
@AlertConfig.register_model_setting
class AlertChannel(ModelData, ChannelSetting):
"""
The channel associated to this alert.
"""
setting_id = 'alert_channel'
_display_name = _p('', 'channel')
_desc = _p(
'',
"The Discord channel this live alert will be sent in."
)
_long_desc = _desc
# Note that this cannot actually be None,
# as there is no UI pathway to unset the setting.
_default = None
_model = AlertsData.AlertChannel
_column = AlertsData.AlertChannel.channelid.name
@property
def update_message(self):
return f"This alert will now be posted to {self.value.channel.mention}"

1
src/modules/voicefix Submodule

Submodule src/modules/voicefix added at d1297ae986

View File

@@ -1,7 +0,0 @@
import logging
logger = logging.getLogger(__name__)
async def setup(bot):
from .cog import VoiceFixCog
await bot.add_cog(VoiceFixCog(bot))

View File

@@ -1,449 +0,0 @@
from collections import defaultdict
from typing import Optional
import asyncio
from cachetools import FIFOCache
import discord
from discord.abc import GuildChannel
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from meta import LionBot, LionCog, LionContext
from meta.errors import ResponseTimedOut, SafeCancellation, UserInputError
from utils.ui import Confirm
from . import logger
from .data import LinkData
async def prepare_attachments(attachments: list[discord.Attachment]):
results = []
for attach in attachments:
try:
as_file = await attach.to_file(spoiler=attach.is_spoiler())
results.append(as_file)
except discord.HTTPException:
pass
return results
async def prepare_embeds(message: discord.Message):
embeds = [embed for embed in message.embeds if embed.type == 'rich']
if message.reference:
embed = discord.Embed(
colour=discord.Colour.dark_gray(),
description=f"Reply to {message.reference.jump_url}"
)
embeds.append(embed)
return embeds
class VoiceFixCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(LinkData())
# Map of linkids to list of channelids
self.link_channels = {}
# Map of channelids to linkids
self.channel_links = {}
# Map of channelids to initialised discord.Webhook
self.hooks = {}
# Map of messageid to list of (channelid, webhookmsg) pairs, for updates
self.message_cache = FIFOCache(maxsize=200)
# webhook msgid -> orig msgid
self.wmessages = FIFOCache(maxsize=600)
self.lock = asyncio.Lock()
async def cog_load(self):
await self.data.init()
await self.reload_links()
async def reload_links(self):
records = await self.data.channel_links.select_where()
channel_links = defaultdict(set)
link_channels = defaultdict(set)
for record in records:
linkid = record['linkid']
channelid = record['channelid']
channel_links[channelid].add(linkid)
link_channels[linkid].add(channelid)
channelids = list(channel_links.keys())
if channelids:
await self.data.LinkHook.fetch_where(channelid=channelids)
for channelid in channelids:
# Will hit cache, so don't need any more data queries
await self.fetch_webhook_for(channelid)
self.channel_links = {cid: tuple(linkids) for cid, linkids in channel_links.items()}
self.link_channels = {lid: tuple(cids) for lid, cids in link_channels.items()}
logger.info(
f"Loaded '{len(link_channels)}' channel links with '{len(self.channel_links)}' linked channels."
)
@LionCog.listener('on_message')
async def on_message(self, message: discord.Message):
# Don't need this because everything except explicit messages are webhooks now
# if self.bot.user and (message.author.id == self.bot.user.id):
# return
if message.webhook_id:
return
async with self.lock:
sent = []
linkids = self.channel_links.get(message.channel.id, ())
if linkids:
for linkid in linkids:
for channelid in self.link_channels[linkid]:
if channelid != message.channel.id:
if message.attachments:
files = await prepare_attachments(message.attachments)
else:
files = []
hook = self.hooks[channelid]
avatar = message.author.avatar or message.author.default_avatar
msg = await hook.send(
content=message.content,
wait=True,
username=message.author.display_name,
avatar_url=avatar.url,
embeds=await prepare_embeds(message),
files=files,
allowed_mentions=discord.AllowedMentions.none()
)
sent.append((channelid, msg))
self.wmessages[msg.id] = message.id
if sent:
# For easier lookup
self.wmessages[message.id] = message.id
sent.append((message.channel.id, message))
self.message_cache[message.id] = sent
logger.info(f"Forwarded message {message.id}")
@LionCog.listener('on_message_edit')
async def on_message_edit(self, before, after):
async with self.lock:
cached_sent = self.message_cache.pop(before.id, ())
new_sent = []
for cid, msg in cached_sent:
try:
if msg.id != before.id:
msg = await msg.edit(
content=after.content,
embeds=await prepare_embeds(after),
)
new_sent.append((cid, msg))
except discord.NotFound:
pass
if new_sent:
self.message_cache[after.id] = new_sent
@LionCog.listener('on_message_delete')
async def on_message_delete(self, message):
async with self.lock:
origid = self.wmessages.get(message.id, None)
if origid:
cached_sent = self.message_cache.pop(origid, ())
for _, msg in cached_sent:
try:
if msg.id != message.id:
await msg.delete()
except discord.NotFound:
pass
@LionCog.listener('on_reaction_add')
async def on_reaction_add(self, reaction: discord.Reaction, user: discord.User):
async with self.lock:
message = reaction.message
emoji = reaction.emoji
origid = self.wmessages.get(message.id, None)
if origid and reaction.count == 1:
cached_sent = self.message_cache.get(origid, ())
for _, msg in cached_sent:
# TODO: Would be better to have a Message and check the reactions
try:
if msg.id != message.id:
await msg.add_reaction(emoji)
except discord.HTTPException:
pass
async def fetch_webhook_for(self, channelid) -> discord.Webhook:
hook = self.hooks.get(channelid, None)
if hook is None:
row = await self.data.LinkHook.fetch(channelid)
if row is None:
channel = self.bot.get_channel(channelid)
if channel is None:
raise ValueError("Cannot find channel to create hook.")
hook = await channel.create_webhook(name="LabRat Channel Link")
await self.data.LinkHook.create(
channelid=channelid,
webhookid=hook.id,
token=hook.token,
)
else:
hook = discord.Webhook.partial(row.webhookid, row.token, client=self.bot)
self.hooks[channelid] = hook
return hook
@cmds.hybrid_group(
name='linker',
description="Base command group for the channel linker"
)
@appcmds.default_permissions(manage_channels=True)
async def linker_group(self, ctx: LionContext):
...
@linker_group.command(
name='link',
description="Create a new link, or add a channel to an existing link."
)
@appcmds.describe(
name="Name of the new or existing channel link.",
channel1="First channel to add to the link.",
channel2="Second channel to add to the link.",
channel3="Third channel to add to the link.",
channel4="Fourth channel to add to the link.",
channel5="Fifth channel to add to the link.",
channelid="Optionally add a channel by id (for e.g. cross-server links).",
)
async def linker_link(self, ctx: LionContext,
name: str,
channel1: Optional[discord.TextChannel | discord.VoiceChannel] = None,
channel2: Optional[discord.TextChannel | discord.VoiceChannel] = None,
channel3: Optional[discord.TextChannel | discord.VoiceChannel] = None,
channel4: Optional[discord.TextChannel | discord.VoiceChannel] = None,
channel5: Optional[discord.TextChannel | discord.VoiceChannel] = None,
channelid: Optional[str] = None,
):
if not ctx.interaction:
return
await ctx.interaction.response.defer(thinking=True)
# Check if link 'name' already exists, create if not
existing = await self.data.Link.fetch_where()
link_row = next((row for row in existing if row.name.lower() == name.lower()), None)
if link_row is None:
# Create
link_row = await self.data.Link.create(name=name)
link_channels = set()
created = True
else:
records = await self.data.channel_links.select_where(linkid=link_row.linkid)
link_channels = {record['channelid'] for record in records}
created = False
# Create webhooks and webhook rows on channels if required
maybe_channels = [
channel1, channel2, channel3, channel4, channel5,
]
if channelid and channelid.isdigit():
channel = self.bot.get_channel(int(channelid))
maybe_channels.append(channel)
channels = [channel for channel in maybe_channels if channel]
for channel in channels:
await self.fetch_webhook_for(channel.id)
# Insert or update the links
for channel in channels:
if channel.id not in link_channels:
await self.data.channel_links.insert(linkid=link_row.linkid, channelid=channel.id)
await self.reload_links()
if created:
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title="Link Created",
description=(
"Created the link **{name}** and linked channels:\n{channels}"
).format(name=name, channels=', '.join(channel.mention for channel in channels))
)
else:
channelids = self.link_channels[link_row.linkid]
channelstr = ', '.join(f"<#{cid}>" for cid in channelids)
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title="Channels Linked",
description=(
"Updated the link **{name}** to link the following channels:\n{channelstr}"
).format(name=link_row.name, channelstr=channelstr)
)
await ctx.reply(embed=embed)
@linker_group.command(
name='unlink',
description="Destroy a link, or remove a channel from a link."
)
@appcmds.describe(
name="Name of the link to destroy",
channel="Channel to remove from the link.",
)
async def linker_unlink(self, ctx: LionContext,
name: str, channel: Optional[GuildChannel] = None):
if not ctx.interaction:
return
# Get the link, error if it doesn't exist
existing = await self.data.Link.fetch_where()
link_row = next((row for row in existing if row.name.lower() == name.lower()), None)
if link_row is None:
raise UserInputError(
f"Link **{name}** doesn't exist!"
)
link_channelids = self.link_channels.get(link_row.linkid, ())
if channel is not None:
# If channel was given, remove channel from link and ack
if channel.id not in link_channelids:
raise UserInputError(
f"{channel.mention} is not linked in **{link_row.name}**!"
)
await self.data.channel_links.delete_where(channelid=channel.id, linkid=link_row.linkid)
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title="Channel Unlinked",
description=f"{channel.mention} has been removed from **{link_row.name}**."
)
else:
# Otherwise, confirm link destroy, delete link row, and ack
channels = ', '.join(f"<#{cid}>" for cid in link_channelids)
confirm = Confirm(
f"Are you sure you want to remove the link **{link_row.name}**?\nLinked channels: {channels}",
ctx.author.id,
)
confirm.embed.colour = discord.Colour.red()
try:
result = await confirm.ask(ctx.interaction)
except ResponseTimedOut:
result = False
if not result:
raise SafeCancellation
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title="Link removed",
description=f"Link **{link_row.name}** removed, the following channels were unlinked:\n{channels}"
)
await link_row.delete()
await self.reload_links()
await ctx.reply(embed=embed)
@linker_link.autocomplete('name')
async def _acmpl_link_name(self, interaction: discord.Interaction, partial: str):
"""
Autocomplete an existing link.
"""
existing = await self.data.Link.fetch_where()
names = [row.name for row in existing]
matching = [row.name for row in existing if partial.lower() in row.name.lower()]
if not matching:
choice = appcmds.Choice(
name=f"Create a new link '{partial}'",
value=partial
)
choices = [choice]
else:
choices = [
appcmds.Choice(
name=f"Link {name}",
value=name
)
for name in matching
]
return choices
@linker_unlink.autocomplete('name')
async def _acmpl_unlink_name(self, interaction: discord.Interaction, partial: str):
"""
Autocomplete an existing link.
"""
existing = await self.data.Link.fetch_where()
matching = [row.name for row in existing if partial.lower() in row.name.lower()]
if not matching:
choice = appcmds.Choice(
name=f"No existing links matching '{partial}'",
value=partial
)
choices = [choice]
else:
choices = [
appcmds.Choice(
name=f"Link {name}",
value=name
)
for name in matching
]
return choices
@linker_group.command(
name='links',
description="Display the existing channel links."
)
async def linker_links(self, ctx: LionContext):
if not ctx.interaction:
return
await ctx.interaction.response.defer(thinking=True)
links = await self.data.Link.fetch_where()
if not links:
embed = discord.Embed(
colour=discord.Colour.light_grey(),
title="No channel links have been set up!",
description="Create a new link and add channels with {linker}".format(
linker=self.bot.core.mention_cmd('linker link')
)
)
else:
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=f"Channel Links in {ctx.guild.name}",
)
for link in links:
channelids = self.link_channels.get(link.linkid, ())
channelstr = ', '.join(f"<#{cid}>" for cid in channelids)
embed.add_field(
name=f"Link **{link.name}**",
value=channelstr,
inline=False
)
# TODO: May want paging if over 25 links....
await ctx.reply(embed=embed)
@linker_group.command(
name="webhook",
description='Manually configure the webhook for a given channel.'
)
async def linker_webhook(self, ctx: LionContext, channel: discord.abc.GuildChannel, webhook: str):
if not ctx.interaction:
return
hook = discord.Webhook.from_url(webhook, client=self.bot)
existing = await self.data.LinkHook.fetch(channel.id)
if existing:
await existing.update(webhookid=hook.id, token=hook.token)
else:
await self.data.LinkHook.create(
channelid=channel.id,
webhookid=hook.id,
token=hook.token,
)
self.hooks[channel.id] = hook
await ctx.reply(f"Webhook for {channel.mention} updated!")

View File

@@ -1,39 +0,0 @@
from data import Registry, RowModel, Table
from data.columns import Integer, Bool, Timestamp, String
class LinkData(Registry):
class Link(RowModel):
"""
Schema
------
CREATE TABLE links(
linkid SERIAL PRIMARY KEY,
name TEXT
);
"""
_tablename_ = 'links'
_cache_ = {}
linkid = Integer(primary=True)
name = String()
channel_links = Table('channel_links')
class LinkHook(RowModel):
"""
Schema
------
CREATE TABLE channel_webhooks(
channelid BIGINT PRIMARY KEY,
webhookid BIGINT NOT NULL,
token TEXT NOT NULL
);
"""
_tablename_ = 'channel_webhooks'
_cache_ = {}
channelid = Integer(primary=True)
webhookid = Integer()
token = String()