Files
croccybot/src/modules/rolemenus/rolemenu.py

824 lines
31 KiB
Python

import json
from typing import Optional, TYPE_CHECKING
import datetime as dt
from collections import defaultdict
import discord
from discord.ui.select import Select, SelectOption
from discord.ui.button import Button, ButtonStyle
from meta import LionBot
from meta.errors import UserInputError, SafeCancellation
from utils.ui import MessageArgs, HookedItem, AsComponents
from utils.lib import utc_now, jumpto, emojikey
from babel.translator import ctx_locale
from modules.economy.cog import Economy, EconomyData, TransactionType
from .data import RoleMenuData as Data
from .data import MenuType
from .menuoptions import RoleMenuConfig
from .roleoptions import RoleMenuRoleConfig
from .templates import templates
from . import logger, babel
if TYPE_CHECKING:
from .cog import RoleMenuCog
_p, _np = babel._p, babel._np
MISSING = object()
DEFAULT_EMOJIS = "🍏 🍎 🍐 🍊 🍋 🍌 🍉 🍇 🫐 🍓 🍈 🍒 🍑 🥭 🍍 🥥 🥝 🍅 🍆 🥑 🫒 🥦 🥬 🫑 🥒".split()
DEFAULT_EMOJIS_PARTIALS = [discord.PartialEmoji(name=string) for string in DEFAULT_EMOJIS]
class MenuDropdown(HookedItem, Select):
...
class MenuButton(HookedItem, Button):
...
class RoleMenuRole:
def __init__(self, bot: LionBot, data: Data.RoleMenuRole):
self.bot = bot
self.data = data
self.config = RoleMenuRoleConfig(data.menuroleid, data)
@property
def custom_id(self):
return f"rmrid:{self.data.menuroleid}"
@property
def as_option(self):
return SelectOption(
emoji=self.config.emoji.data or None,
label=self.config.label.value,
value=str(self.data.menuroleid),
description=self.config.description.value,
)
@property
def as_button(self):
@MenuButton(
emoji=self.config.emoji.data or None,
label=self.config.label.value,
custom_id=self.custom_id,
style=ButtonStyle.grey
)
async def menu_button(press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
menu = await RoleMenu.fetch(self.bot, self.data.menuid)
await menu.interactive_selection(press, self.data.menuroleid)
return menu_button
class RoleMenu:
# Cache of messages with listening menus attached
attached_menus = defaultdict(dict) # guildid -> messageid -> menuid
# Registry of persistent Views for given menus
menu_views = {} # menuid -> View
# Persistent cache of menus
_menus = {} # menuid -> Menu
def __init__(self, bot: LionBot, data: Data.RoleMenu, rolemap):
self.bot = bot
self.cog: 'RoleMenuCog' = bot.get_cog('RoleMenuCog')
self.data = data
self.config = RoleMenuConfig(data.menuid, data)
self.rolemap: dict[int, RoleMenuRole] = rolemap
self.roles = list(rolemap.values())
self._message = MISSING
@property
def _view(self) -> Optional[discord.ui.View]:
"""
Active persistent View for this menu.
"""
return self.menu_views.get(self.data.menuid, None)
@property
def message(self):
if self._message is MISSING:
raise ValueError("Cannot access menu message before fetch")
else:
return self._message
@property
def jump_link(self):
if self.data.messageid:
link = jumpto(
self.data.guildid,
self.data.channelid,
self.data.messageid
)
else:
link = None
return link
@property
def managed(self):
"""
Whether the menu message is owned by the bot.
Returns True if the menu is unattached.
"""
if self._message is MISSING:
# Unknown, but send falsey value
managed = None
elif self._message is None:
managed = True
elif self._message.author is self._message.guild.me:
managed = True
else:
managed = False
return managed
@classmethod
async def fetch(cls, bot: LionBot, menuid: int):
"""
Fetch the requested menu by id, applying registry cache where possible.
"""
if (menu := cls._menus.get(menuid, None)) is None:
cog = bot.get_cog('RoleMenuCog')
data = await cog.data.RoleMenu.fetch(menuid)
role_rows = await cog.data.RoleMenuRole.fetch_where(menuid=menuid).order_by('menuroleid')
rolemap = {row.menuroleid: RoleMenuRole(bot, row) for row in role_rows}
menu = cls(bot, data, rolemap)
cls._menus[menuid] = menu
return menu
@classmethod
async def create(cls, bot: LionBot, **data_args):
cog = bot.get_cog('RoleMenuCog')
data = await cog.data.RoleMenu.create(
**data_args
)
menu = cls(bot, data, {})
cls._menus[data.menuid] = menu
await menu.attach()
return menu
async def fetch_message(self, refresh=False) -> Optional[discord.Message]:
"""
Fetch the message the menu is attached to.
"""
if refresh or self._message is MISSING:
if self.data.messageid is None:
_message = None
else:
_message = None
channelid = self.data.channelid
channel = self.bot.get_channel(channelid)
if channel is not None:
try:
_message = await channel.fetch_message(self.data.messageid)
except discord.NotFound:
pass
except discord.Forbidden:
pass
except discord.HTTPException:
# Something unexpected went wrong, leave the data alone for now
logger.exception("Something unexpected occurred while fetching the menu message")
raise
if _message is None:
await self.data.update(messageid=None)
self._message = _message
return self._message
async def update_raw(self):
"""
Updates the saved raw message data for non-owned menus.
"""
message = await self.fetch_message()
if not self.managed and message is not None:
message_data = {}
message_data['content'] = message.content
if message.embeds:
message_data['embed'] = message.embeds[0].to_dict()
rawmessage = json.dumps(message_data)
if rawmessage != self.data.rawmessage:
await self.data.update(rawmessage=rawmessage)
def emoji_map(self):
emoji_map = {}
for mrole in self.roles:
emoji = mrole.config.emoji.as_partial
if emoji is not None:
emoji_map[emoji] = mrole.data.menuroleid
return emoji_map
async def attach(self):
"""
Start listening for menu selection events.
"""
if self.data.messageid:
self.attached_menus[self.data.guildid][self.data.messageid] = self.data.menuid
if self.data.menutype is not MenuType.REACTION:
view = await self.make_view()
if view is not None:
self.menu_views[self.data.menuid] = view
self.bot.add_view(view)
elif self.data.menutype is MenuType.REACTION:
pass
def detach(self):
"""
Stop listening for menu selection events.
"""
view = self.menu_views.pop(self.data.menuid, None)
if view is not None:
view.stop()
if (mid := self.data.messageid) is not None:
self.attached_menus[self.data.guildid].pop(mid, None)
async def delete(self):
self.detach()
self._menus.pop(self.data.menuid, None)
# Delete the menu, along with the message if it is self-managed.
message = await self.fetch_message()
if message and message.author is message.guild.me:
try:
await message.delete()
except discord.HTTPException:
# This should never really fail since we own the message
# But it is possible the message was externally deleted and we never updated message cache
# So just ignore quietly
pass
# Cancel any relevant expiry tasks (before we delete data which will delete the equip rows)
expiring = await self.cog.data.RoleMenuHistory.fetch_expiring_where(menuid=self.data.menuid)
if expiring:
await self.cog.cancel_expiring_tasks(*(row.equipid for row in expiring))
await self.data.delete()
async def reload_roles(self):
"""
Fetches and re-initialises the MenuRoles for this Menu.
"""
roledata = self.bot.get_cog('RoleMenuCog').data.RoleMenuRole
role_rows = await roledata.fetch_where(menuid=self.data.menuid).order_by('menuroleid')
self.rolemap = {row.menuroleid: RoleMenuRole(self.bot, row) for row in role_rows}
self.roles = list(self.rolemap.values())
async def update_message(self):
"""
Update the (managed) message the menu is attached to.
Does nothing if there is not message or it is not bot-managed.
"""
self.detach()
message = await self.fetch_message()
if message is not None and self.managed:
args = await self.make_args()
view = await self.make_view()
try:
await message.edit(**args.edit_args, view=view)
await self.attach()
except discord.NotFound:
await self.data.update(messageid=None)
self._message = None
except discord.HTTPException as e:
t = self.bot.translator.t
error = discord.Embed(
colour=discord.Colour.brand_red(),
title=t(_p(
'rolemenu|menu_message|error|title',
'ROLE MENU DISPLAY ERROR'
)),
description=t(_p(
'rolemenu|menu_message|error|desc',
"A critical error occurred trying to display this role menu.\n"
"Error: `{error}`."
)).format(error=e.text)
)
try:
await message.edit(
embed=error
)
except discord.HTTPException:
# There's really something wrong
# Nothing we can safely do.
pass
pass
async def update_reactons(self):
"""
Attempt to update the reactions on a REACTION type menu.
Does nothing if the menu is not REACTION type.
Will raise `SafeCancellation` and stop if a reaction fails.
"""
message = await self.fetch_message()
if message is not None and self.data.menutype is MenuType.REACTION:
# First remove any of my reactions that are no longer relevant
required = {
emojikey(mrole.config.emoji.as_partial) for mrole in self.roles if mrole.data.emoji
}
for reaction in message.reactions:
if reaction.me and (emojikey(reaction.emoji) not in required):
try:
await message.remove_reaction(reaction.emoji, message.guild.me)
except discord.HTTPException:
pass
# Then add any extra reactions that are missing
existing_mine = {
emojikey(reaction.emoji) for reaction in message.reactions if reaction.me
}
existing = {
emojikey(reaction.emoji) for reaction in message.reactions
}
for mrole in self.roles:
emoji = mrole.config.emoji.as_partial
if emoji is not None and emojikey(emoji) not in existing_mine:
try:
await message.add_reaction(emoji)
except discord.HTTPException:
if emojikey(emoji) not in existing:
t = self.bot.translator.t
raise SafeCancellation(
t(_p(
'rolemenu|update_reactions|error',
"Could not add the {emoji} reaction, perhaps I do not "
"have access to this emoji! Reactions will need to be added "
"manually."
)).format(emoji=emoji)
)
else:
# We can't react with this emoji, but it does exist on the message
# Just ignore the error and continue
continue
async def repost_to(self, destination):
# Set the current message to be deleted if it is a managed message.
# Don't delete until after we have successfully moved the menu though.
if self.managed and (message := self.message):
to_delete = message
else:
to_delete = None
# Now try and post the message in the new channel
args = await self.make_args()
view = await self.make_view()
new_message = await destination.send(**args.send_args, view=view or discord.utils.MISSING)
# Stop listening to events on the current message (if it exists)
self.detach()
await self.data.update(channelid=destination.id, messageid=new_message.id)
self._message = new_message
await self.attach()
if to_delete:
# Attempt to delete the original message
try:
await to_delete.delete()
except discord.HTTPException:
pass
async def _make_button_view(self):
buttons = [mrole.as_button for mrole in self.roles]
return AsComponents(*buttons, timeout=None)
async def _make_dropdown_view(self):
t = self.bot.translator.t
placeholder = t(_p(
'ui:rolemenu_dropdown|placeholder',
"Select Roles"
))
options = [mrole.as_option for mrole in self.roles]
@MenuDropdown(
custom_id=f"menuid:{self.data.menuid}",
placeholder=placeholder,
options=options,
min_values=0, max_values=1
)
async def menu_dropdown(selection: discord.Interaction, selected: Select):
if selected.values:
await selection.response.defer(thinking=True, ephemeral=True)
menuroleid = int(selected.values[0])
menu = await self.fetch(self.bot, self.data.menuid)
await menu.interactive_selection(selection, menuroleid)
else:
await selection.response.defer(thinking=False)
return AsComponents(menu_dropdown, timeout=None)
async def make_view(self) -> Optional[discord.ui.View]:
"""
Create the appropriate discord.View for this menu.
May be None if the menu has no roles or is a REACTION menu.
"""
lguild = await self.bot.core.lions.fetch_guild(self.data.guildid)
ctx_locale.set(lguild.locale)
if not self.roles:
view = None
elif self.data.menutype is MenuType.REACTION:
view = None
elif self.data.menutype is MenuType.DROPDOWN:
view = await self._make_dropdown_view()
elif self.data.menutype is MenuType.BUTTON:
view = await self._make_button_view()
return view
async def make_args(self) -> MessageArgs:
"""
Generate the message arguments for this menu.
"""
if (tid := self.data.templateid) is not None:
# Apply template
template = templates[tid]
args = await template.render_menu(self)
else:
raw = self.data.rawmessage
data = json.loads(raw)
args = MessageArgs(
content=data.get('content', ''),
embed=discord.Embed.from_dict(data['embed']) if 'embed' in data else None
)
return args
def unused_emojis(self, include_defaults=True):
"""
Fetch the next emoji on the message that is not already assigned to a role.
Checks custom emojis by PartialEmoji equality (i.e. by id).
If no reaction exists, uses a default emoji.
"""
if self.message:
message_emojis = [reaction.emoji for reaction in self.message.reactions]
else:
message_emojis = []
if self.data.menutype is MenuType.REACTION:
valid_emojis = (*message_emojis, *DEFAULT_EMOJIS_PARTIALS)
else:
valid_emojis = message_emojis
menu_emojis = {emojikey(mrole.config.emoji.as_partial) for mrole in self.roles}
for emoji in valid_emojis:
if emojikey(emoji) not in menu_emojis:
yield str(emoji)
async def _handle_positive(self, lion, member: discord.Member, mrole: RoleMenuRole) -> discord.Embed:
t = self.bot.translator.t
guild = member.guild
role = guild.get_role(mrole.data.roleid)
if not role:
raise ValueError("Calling _handle_positive without a valid role.")
price = mrole.config.price.value
obtainable = self.config.obtainable.value
remove_line = ''
if obtainable is not None:
# Check shared roles
menu_roles = {mrole.data.roleid: mrole for mrole in self.roles}
common = [role for role in member.roles if role.id in menu_roles]
if len(common) >= obtainable:
swap = None
if len(common) == 1 and not self.config.sticky.value:
swap = menu_roles[common[0].id]
# Check if LC will be lost by exchanging the role
if (swap.config.price.value) > 0 and not self.config.refunds.value:
swap = None
if swap is not None:
# Do remove
try:
remove_embed = await self._handle_negative(lion, member, swap)
remove_line = remove_embed.description
except UserInputError:
# If we failed to remove for some reason, pretend we didn't try
swap = None
if swap is None:
error = t(_np(
'rolemenu|select|error:max_obtainable',
"You can own at most one role from this menu! You currently own:",
"You can own at most **{count}** roles from this menu! You currently own:",
obtainable
)).format(count=obtainable)
error = '\n'.join((error, *(role.mention for role in common)))
raise UserInputError(error)
if price:
# Check member balance
# TODO: More transaction safe (or rather check again after transaction)
await lion.data.refresh()
balance = lion.data.coins
if balance < price:
raise UserInputError(
t(_p(
'rolemenu|select|error:insufficient_funds',
"The role **{role}** costs {coin}**{cost}**,"
"but you only have {coin}**{balance}**!"
)).format(
role=role.name,
coin=self.bot.config.emojis.coin,
cost=price,
balance=balance,
)
)
try:
await member.add_roles(role)
except discord.Forbidden:
raise UserInputError(
t(_p(
'rolemenu|select|error:perms',
"I don't have enough permissions to give you this role!"
))
)
except discord.HTTPException:
raise UserInputError(
t(_p(
'rolemenu|select|error:discord',
"An unknown error occurred while assigning your role! "
"Please try again later."
))
)
now = utc_now()
# Create transaction if applicable
if price:
economy: Economy = self.bot.get_cog('Economy')
tx = await economy.data.Transaction.execute_transaction(
transaction_type=TransactionType.OTHER,
guildid=guild.id, actorid=member.id,
from_account=member.id, to_account=None,
amount=price
)
tid = tx.transactionid
else:
tid = None
# Calculate expiry
duration = mrole.config.duration.value
if duration is not None:
expiry = now + dt.timedelta(seconds=duration)
else:
expiry = None
# Add to equip history
equip = await self.cog.data.RoleMenuHistory.create(
menuid=self.data.menuid, roleid=role.id,
userid=member.id,
obtained_at=now,
transactionid=tid,
expires_at=expiry
)
await self.cog.schedule_expiring(equip)
# Ack the selection
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'rolemenu|select|success|title',
"Role equipped"
))
)
if price:
embed.description = t(_p(
'rolemenu|select|success:purchase|desc',
"You have purchased the role **{role}** for {coin}**{amount}**"
)).format(role=role.name, coin=self.bot.config.emojis.coin, amount=price)
else:
embed.description = t(_p(
'rolemenu|select|success:nopurchase|desc',
"You have equipped **{role}**"
)).format(role=role.name)
if expiry is not None:
embed.description += '\n' + t(_p(
'rolemenu|select|expires_at',
"The role will expire at {timestamp}."
)).format(
timestamp=discord.utils.format_dt(expiry)
)
if remove_line:
embed.description = '\n'.join((remove_line, embed.description))
lguild = await self.bot.core.lions.fetch_guild(self.data.guildid)
lguild.log_event(
title=t(_p(
'rolemenu|eventlog|event:role_equipped|title',
"Member equipped role from role menu"
)),
description=t(_p(
'rolemenu|eventlog|event:role_equipped|desc',
"{member} equipped {role} from {menu}"
)).format(
member=member.mention,
role=role.mention,
menu=self.jump_link
),
roles_given=role.mention,
price=price,
expiry=discord.utils.format_dt(expiry) if expiry is not None else None,
)
return embed
async def _handle_negative(self, lion, member: discord.Member, mrole: RoleMenuRole) -> discord.Embed:
t = self.bot.translator.t
guild = member.guild
role = guild.get_role(mrole.data.roleid)
if not role:
raise ValueError("Calling _handle_negative without a valid role.")
if self.config.sticky.value:
# Cannot deselect
raise UserInputError(
t(_p(
'rolemenu|deselect|error:sticky',
"**{role}** is a sticky role, you cannot remove it with this menu!"
)).format(role=role.name)
)
# Remove the role
try:
await member.remove_roles(role)
except discord.Forbidden:
raise UserInputError(
t(_p(
'rolemenu|deselect|error:perms',
"I don't have enough permissions to remove this role from you!"
))
)
except discord.HTTPException:
raise UserInputError(
t(_p(
'rolemenu|deselect|error:discord',
"An unknown error occurred removing your role! Please try again later."
))
)
# Update history
now = utc_now()
history = await self.cog.data.RoleMenuHistory.table.update_where(
menuid=self.data.menuid,
roleid=role.id,
userid=member.id,
removed_at=None,
).set(removed_at=now)
await self.cog.cancel_expiring_tasks(*(row['equipid'] for row in history))
# Refund if required
transactionids = [row['transactionid'] for row in history]
if self.config.refunds.value and any(transactionids):
transactionids = [tid for tid in transactionids if tid]
economy: Economy = self.bot.get_cog('Economy')
refunded = await economy.data.Transaction.refund_transactions(*transactionids)
total_refund = sum(row.amount + row.bonus for row in refunded)
else:
total_refund = 0
# Ack the removal
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'rolemenu|deslect|success|title',
"Role removed"
))
)
if total_refund > 0:
embed.description = t(_p(
'rolemenu|deselect|success:refund|desc',
"You have removed **{role}**, and been refunded {coin} **{amount}**."
)).format(role=role.name, coin=self.bot.config.emojis.coin, amount=total_refund)
if total_refund < 0:
# TODO: Consider disallowing them from removing roles if their balance would go negative
embed.description = t(_p(
'rolemenu|deselect|success:negrefund|desc',
"You have removed **{role}**, and have lost {coin} **{amount}**."
)).format(role=role.name, coin=self.bot.config.emojis.coin, amount=-total_refund)
else:
embed.description = t(_p(
'rolemenu|deselect|success:norefund|desc',
"You have unequipped **{role}**."
)).format(role=role.name)
lguild = await self.bot.core.lions.fetch_guild(self.data.guildid)
lguild.log_event(
title=t(_p(
'rolemenu|eventlog|event:role_unequipped|title',
"Member unequipped role from role menu"
)),
description=t(_p(
'rolemenu|eventlog|event:role_unequipped|desc',
"{member} unequipped {role} from {menu}"
)).format(
member=member.mention,
role=role.mention,
menu=self.jump_link,
),
roles_given=role.mention,
refund=total_refund,
)
return embed
async def _handle_selection(self, lion, member: discord.Member, menuroleid: int):
lock_key = ('rmenu', member.id, member.guild.id)
async with self.bot.idlock(lock_key):
mrole = self.rolemap.get(menuroleid, None)
if mrole is None:
raise ValueError(
f"Attempt to process event for invalid menuroleid {menuroleid}, THIS SHOULD NOT HAPPEN."
)
guild = member.guild
t = self.bot.translator.t
role = guild.get_role(mrole.data.roleid)
if role is None:
# This role no longer exists, nothing we can do
raise UserInputError(
t(_p(
'rolemenu|error:role_gone',
"The role **{name}** no longer exists!"
)).format(name=mrole.data.label)
)
required = self.config.required_role.data
if required is not None:
# Check member has the required role
if required not in [role.id for role in member.roles]:
role = guild.get_role(required)
name = role.name if role else required
raise UserInputError(
t(_p(
'rolemenu|select|error:required_role',
"You need to have the role **{role}** required to use this menu!"
)).format(role=name)
)
if role in member.roles:
# Member already has the role, deselection case.
return await self._handle_negative(lion, member, mrole)
else:
# Member does not have the role, selection case.
return await self._handle_positive(lion, member, mrole)
async def interactive_selection(self, interaction: discord.Interaction, menuroleid: int):
"""
Handle a component interaction callback for this menu.
Assumes the interaction has already been responded to (ephemerally).
"""
member = interaction.user
guild = interaction.guild
if not isinstance(member, discord.Member):
# Occasionally Discord drops the ball on user type. This manually fetches the guild member.
member = await guild.fetch_member(member.id)
# Localise to the member's locale
lion = await self.bot.core.lions.fetch_member(guild.id, member.id, member=member)
ctx_locale.set(lion.private_locale(interaction))
result = await self._handle_selection(lion, member, menuroleid)
await interaction.edit_original_response(embed=result)
async def handle_reaction(self, reaction_payload: discord.RawReactionActionEvent):
"""
Handle a raw reaction event on a message the menu is attached to.
Ignores the event if it is not relevant.
"""
guild = self.bot.get_guild(reaction_payload.guild_id)
channel = self.bot.get_channel(reaction_payload.channel_id)
if guild and channel:
emoji_map = self.emoji_map()
menuroleid = emoji_map.get(reaction_payload.emoji, None)
if menuroleid is not None:
member = reaction_payload.member
if not member:
member = await guild.fetch_member(reaction_payload.user_id)
if member.bot:
return
lion = await self.bot.core.lions.fetch_member(guild.id, member.id, member=member)
ctx_locale.set(lion.private_locale())
try:
embed = await self._handle_selection(lion, member, menuroleid)
except UserInputError as e:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description=e.msg
)
t = self.bot.translator.t
content = t(_p(
'rolemenu|content:reactions',
"[Click here]({jump_link}) to jump back."
)).format(jump_link=jumpto(guild.id, channel.id, reaction_payload.message_id))
try:
await member.send(content=content, embed=embed)
except discord.HTTPException:
pass