Files
croccybot/bot/utils/interactive.py
Conatum 459a728968 Core user data and leaderboard commands.
Added flexibility to data `update_where`.
Added interactive utils, with improved pager.
Added user data table, with caching and transactional interface.
Added `topcoins` command to `Economy`
Added `top` command to `Study`
2021-08-26 22:34:46 +03:00

462 lines
16 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import discord
from cmdClient import Context
from cmdClient.lib import UserCancelled, ResponseTimedOut
from .lib import paginate_list
# TODO: Interactive locks
cancel_emoji = ''
number_emojis = (
'1', '2', '3', '4', '5', '6', '7', '8', '9'
)
async def discord_shield(coro):
try:
await coro
except discord.HTTPException:
pass
@Context.util
async def cancellable(ctx, msg, add_reaction=True, cancel_message=None, timeout=300):
"""
Add a cancellation reaction to the given message.
Pressing the reaction triggers cancellation of the original context, and a UserCancelled-style error response.
"""
# TODO: Not consistent with the exception driven flow, make a decision here?
# Add reaction
if add_reaction and cancel_emoji not in (str(r.emoji) for r in msg.reactions):
try:
await msg.add_reaction(cancel_emoji)
except discord.HTTPException:
return
# Define cancellation function
async def _cancel():
try:
await ctx.client.wait_for(
'reaction_add',
timeout=timeout,
check=lambda r, u: (u == ctx.author
and r.message == msg
and str(r.emoji) == cancel_emoji)
)
except asyncio.TimeoutError:
pass
else:
await ctx.client.active_command_response_cleaner(ctx)
if cancel_message:
await ctx.error_reply(cancel_message)
else:
try:
await ctx.msg.add_reaction(cancel_emoji)
except discord.HTTPException:
pass
[task.cancel() for task in ctx.tasks]
# Launch cancellation task
task = asyncio.create_task(_cancel())
ctx.tasks.append(task)
return task
@Context.util
async def listen_for(ctx, allowed_input=None, timeout=120, lower=True, check=None):
"""
Listen for a one of a particular set of input strings,
sent in the current channel by `ctx.author`.
When found, return the message containing them.
Parameters
----------
allowed_input: Union(List(str), None)
List of strings to listen for.
Allowed to be `None` precisely when a `check` function is also supplied.
timeout: int
Number of seconds to wait before timing out.
lower: bool
Whether to shift the allowed and message strings to lowercase before checking.
check: Function(message) -> bool
Alternative custom check function.
Returns: discord.Message
The message that was matched.
Raises
------
cmdClient.lib.ResponseTimedOut:
Raised when no messages matching the given criteria are detected in `timeout` seconds.
"""
# Generate the check if it hasn't been provided
if not check:
# Quick check the arguments are sane
if not allowed_input:
raise ValueError("allowed_input and check cannot both be None")
# Force a lower on the allowed inputs
allowed_input = [s.lower() for s in allowed_input]
# Create the check function
def check(message):
result = (message.author == ctx.author)
result = result and (message.channel == ctx.ch)
result = result and ((message.content.lower() if lower else message.content) in allowed_input)
return result
# Wait for a matching message, catch and transform the timeout
try:
message = await ctx.client.wait_for('message', check=check, timeout=timeout)
except asyncio.TimeoutError:
raise ResponseTimedOut("Session timed out waiting for user response.") from None
return message
@Context.util
async def selector(ctx, header, select_from, timeout=120, max_len=20):
"""
Interactive routine to prompt the `ctx.author` to select an item from a list.
Returns the list index that was selected.
Parameters
----------
header: str
String to put at the top of each page of selection options.
Intended to be information about the list the user is selecting from.
select_from: List(str)
The list of strings to select from.
timeout: int
The number of seconds to wait before throwing `ResponseTimedOut`.
max_len: int
The maximum number of items to display on each page.
Decrease this if the items are long, to avoid going over the char limit.
Returns
-------
int:
The index of the list entry selected by the user.
Raises
------
cmdClient.lib.UserCancelled:
Raised if the user manually cancels the selection.
cmdClient.lib.ResponseTimedOut:
Raised if the user fails to respond to the selector within `timeout` seconds.
"""
# Handle improper arguments
if len(select_from) == 0:
raise ValueError("Selection list passed to `selector` cannot be empty.")
# Generate the selector pages
footer = "Please reply with the number of your selection, or press {} to cancel.".format(cancel_emoji)
list_pages = paginate_list(select_from, block_length=max_len)
pages = ["\n".join([header, page, footer]) for page in list_pages]
# Post the pages in a paged message
out_msg = await ctx.pager(pages, add_cancel=True)
cancel_task = await ctx.cancellable(out_msg, add_reaction=False, timeout=None)
if len(select_from) <= 5:
for i, _ in enumerate(select_from):
asyncio.create_task(discord_shield(out_msg.add_reaction(number_emojis[i])))
# Build response tasks
valid_input = [str(i+1) for i in range(0, len(select_from))] + ['c', 'C']
listen_task = asyncio.create_task(ctx.listen_for(valid_input, timeout=None))
emoji_task = asyncio.create_task(ctx.client.wait_for(
'reaction_add',
check=lambda r, u: (u == ctx.author
and r.message == out_msg
and str(r.emoji) in number_emojis)
))
# Wait for the response tasks
done, pending = await asyncio.wait(
(listen_task, emoji_task),
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED
)
# Cleanup
try:
await out_msg.delete()
except discord.HTTPException:
pass
# Handle different return cases
if listen_task in done:
emoji_task.cancel()
result_msg = listen_task.result()
try:
await result_msg.delete()
except discord.HTTPException:
pass
if result_msg.content.lower() == 'c':
raise UserCancelled("Selection cancelled!")
result = int(result_msg.content) - 1
elif emoji_task in done:
listen_task.cancel()
reaction, _ = emoji_task.result()
result = number_emojis.index(str(reaction.emoji))
elif cancel_task in done:
# Manually cancelled case.. the current task should have been cancelled
# Raise UserCancelled in case the task wasn't cancelled for some reason
raise UserCancelled("Selection cancelled!")
elif not done:
# Timeout case
raise ResponseTimedOut("Selector timed out waiting for a response.")
# Finally cancel the canceller and return the provided index
cancel_task.cancel()
return result
@Context.util
async def pager(ctx, pages, locked=True, start_at=0, add_cancel=False, **kwargs):
"""
Shows the user each page from the provided list `pages` one at a time,
providing reactions to page back and forth between pages.
This is done asynchronously, and returns after displaying the first page.
Parameters
----------
pages: List(Union(str, discord.Embed))
A list of either strings or embeds to display as the pages.
locked: bool
Whether only the `ctx.author` should be able to use the paging reactions.
kwargs: ...
Remaining keyword arguments are transparently passed to the reply context method.
Returns: discord.Message
This is the output message, returned for easy deletion.
"""
# Handle broken input
if len(pages) == 0:
raise ValueError("Pager cannot page with no pages!")
# Post first page. Method depends on whether the page is an embed or not.
if isinstance(pages[start_at], discord.Embed):
out_msg = await ctx.reply(embed=pages[start_at], **kwargs)
else:
out_msg = await ctx.reply(pages[start_at], **kwargs)
# Run the paging loop if required
if len(pages) > 1:
task = asyncio.create_task(_pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs))
ctx.tasks.append(task)
elif add_cancel:
await out_msg.add_reaction(cancel_emoji)
# Return the output message
return out_msg
async def _pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs):
"""
Asynchronous initialiser and loop for the `pager` utility above.
"""
# Page number
page = start_at
# Add reactions to the output message
next_emoji = ""
prev_emoji = ""
try:
await out_msg.add_reaction(prev_emoji)
if add_cancel:
await out_msg.add_reaction(cancel_emoji)
await out_msg.add_reaction(next_emoji)
except discord.Forbidden:
# We don't have permission to add paging emojis
# Die as gracefully as we can
if ctx.guild:
perms = ctx.ch.permissions_for(ctx.guild.me)
if not perms.add_reactions:
await ctx.error_reply(
"Cannot page results because I do not have the `add_reactions` permission!"
)
elif not perms.read_message_history:
await ctx.error_reply(
"Cannot page results because I do not have the `read_message_history` permission!"
)
else:
await ctx.error_reply(
"Cannot page results due to insufficient permissions!"
)
else:
await ctx.error_reply(
"Cannot page results!"
)
return
# Check function to determine whether a reaction is valid
def reaction_check(reaction, user):
result = reaction.message.id == out_msg.id
result = result and str(reaction.emoji) in [next_emoji, prev_emoji]
result = result and not (user.id == ctx.client.user.id)
result = result and not (locked and user != ctx.author)
return result
# Check function to determine if message has a page number
def message_check(message):
result = message.channel.id == ctx.ch.id
result = result and not (locked and message.author != ctx.author)
result = result and message.content.lower().startswith('p')
result = result and message.content[1:].isdigit()
result = result and 1 <= int(message.content[1:]) <= len(pages)
return result
# Begin loop
while True:
# Wait for a valid reaction or message, break if we time out
reaction_task = asyncio.create_task(
ctx.client.wait_for('reaction_add', check=reaction_check)
)
message_task = asyncio.create_task(
ctx.client.wait_for('message', check=message_check)
)
done, pending = await asyncio.wait(
(reaction_task, message_task),
timeout=300,
return_when=asyncio.FIRST_COMPLETED
)
if done:
if reaction_task in done:
# Cancel the message task and collect the reaction result
message_task.cancel()
reaction, user = reaction_task.result()
# Attempt to remove the user's reaction, silently ignore errors
asyncio.ensure_future(out_msg.remove_reaction(reaction.emoji, user))
# Change the page number
page += 1 if reaction.emoji == next_emoji else -1
page %= len(pages)
elif message_task in done:
# Cancel the reaction task and collect the message result
reaction_task.cancel()
message = message_task.result()
# Attempt to delete the user's message, silently ignore errors
asyncio.ensure_future(message.delete())
# Move to the correct page
page = int(message.content[1:]) - 1
# Edit the message with the new page
active_page = pages[page]
if isinstance(active_page, discord.Embed):
await out_msg.edit(embed=active_page, **kwargs)
else:
await out_msg.edit(content=active_page, **kwargs)
else:
# No tasks finished, so we must have timed out, or had an exception.
# Break the loop and clean up
break
# Clean up by removing the reactions
try:
await out_msg.clear_reactions()
except discord.Forbidden:
try:
await out_msg.remove_reaction(next_emoji, ctx.client.user)
await out_msg.remove_reaction(prev_emoji, ctx.client.user)
except discord.NotFound:
pass
except discord.NotFound:
pass
@Context.util
async def input(ctx, msg="", timeout=120):
"""
Listen for a response in the current channel, from ctx.author.
Returns the response from ctx.author, if it is provided.
Parameters
----------
msg: string
Allows a custom input message to be provided.
Will use default message if not provided.
timeout: int
Number of seconds to wait before timing out.
Raises
------
cmdClient.lib.ResponseTimedOut:
Raised when ctx.author does not provide a response before the function times out.
"""
# Deliver prompt
offer_msg = await ctx.reply(msg or "Please enter your input.")
# Criteria for the input message
def checks(m):
return m.author == ctx.author and m.channel == ctx.ch
# Listen for the reply
try:
result_msg = await ctx.client.wait_for("message", check=checks, timeout=timeout)
except asyncio.TimeoutError:
raise ResponseTimedOut("Session timed out waiting for user response.") from None
result = result_msg.content
# Attempt to delete the prompt and reply messages
try:
await offer_msg.delete()
await result_msg.delete()
except Exception:
pass
return result
@Context.util
async def ask(ctx, msg, timeout=30, use_msg=None, del_on_timeout=False):
"""
Ask ctx.author a yes/no question.
Returns 0 if ctx.author answers no
Returns 1 if ctx.author answers yes
Parameters
----------
msg: string
Adds the question to the message string.
Requires an input.
timeout: int
Number of seconds to wait before timing out.
use_msg: string
A completely custom string to use instead of the default string.
del_on_timeout: bool
Whether to delete the question if it times out.
Raises
------
Nothing
"""
out = "{} {}".format(msg, "`y(es)`/`n(o)`")
offer_msg = use_msg or await ctx.reply(out)
if use_msg:
await use_msg.edit(content=msg)
result_msg = await ctx.listen_for(["y", "yes", "n", "no"], timeout=timeout)
if result_msg is None:
if del_on_timeout:
try:
await offer_msg.delete()
except Exception:
pass
return None
result = result_msg.content.lower()
try:
if not use_msg:
await offer_msg.delete()
await result_msg.delete()
except Exception:
pass
if result in ["n", "no"]:
return 0
return 1