rewrite: Initial rewrite skeleton.

Remove modules that will no longer be required.
Move pending modules to pending-rewrite folders.
This commit is contained in:
2022-09-17 17:06:13 +10:00
parent a7f7dd6e7b
commit a5147323b5
162 changed files with 1 additions and 866 deletions

View File

@@ -1,157 +0,0 @@
import asyncio
import discord
from LionContext import LionContext as Context
from cmdClient.lib import SafeCancellation
from data import tables
from core import Lion
from . import lib
from settings import GuildSettings, UserSettings
@Context.util
async def embed_reply(ctx, desc, colour=discord.Colour.orange(), **kwargs):
"""
Simple helper to embed replies.
All arguments are passed to the embed constructor.
`desc` is passed as the `description` kwarg.
"""
embed = discord.Embed(description=desc, colour=colour, **kwargs)
try:
return await ctx.reply(embed=embed, reference=ctx.msg.to_reference(fail_if_not_exists=False))
except discord.Forbidden:
if not ctx.guild or ctx.ch.permissions_for(ctx.guild.me).send_messages:
await ctx.reply("Command failed, I don't have permission to send embeds in this channel!")
raise SafeCancellation
@Context.util
async def error_reply(ctx, error_str, send_args={}, **kwargs):
"""
Notify the user of a user level error.
Typically, this will occur in a red embed, posted in the command channel.
"""
embed = discord.Embed(
colour=discord.Colour.red(),
description=error_str,
**kwargs
)
message = None
try:
message = await ctx.ch.send(
embed=embed,
reference=ctx.msg.to_reference(fail_if_not_exists=False),
**send_args
)
ctx.sent_messages.append(message)
return message
except discord.Forbidden:
if not ctx.guild or ctx.ch.permissions_for(ctx.guild.me).send_messages:
await ctx.reply("Command failed, I don't have permission to send embeds in this channel!")
raise SafeCancellation
@Context.util
async def offer_delete(ctx: Context, *to_delete, timeout=300):
"""
Offers to delete the provided messages via a reaction on the last message.
Removes the reaction if the offer times out.
If any exceptions occur, handles them silently and returns.
Parameters
----------
to_delete: List[Message]
The messages to delete.
timeout: int
Time in seconds after which to remove the delete offer reaction.
"""
# Get the delete emoji from the config
emoji = lib.cross
# Return if there are no messages to delete
if not to_delete:
return
# The message to add the reaction to
react_msg = to_delete[-1]
# Build the reaction check function
if ctx.guild:
modrole = ctx.guild_settings.mod_role.value if ctx.guild else None
def check(reaction, user):
if not (reaction.message.id == react_msg.id and reaction.emoji == emoji):
return False
if user == ctx.guild.me:
return False
return ((user == ctx.author)
or (user.permissions_in(ctx.ch).manage_messages)
or (modrole and modrole in user.roles))
else:
def check(reaction, user):
return user == ctx.author and reaction.message.id == react_msg.id and reaction.emoji == emoji
try:
# Add the reaction to the message
await react_msg.add_reaction(emoji)
# Wait for the user to press the reaction
reaction, user = await ctx.client.wait_for("reaction_add", check=check, timeout=timeout)
# Since the check was satisfied, the reaction is correct. Delete the messages, ignoring any exceptions
deleted = False
# First try to bulk delete if we have the permissions
if ctx.guild and ctx.ch.permissions_for(ctx.guild.me).manage_messages:
try:
await ctx.ch.delete_messages(to_delete)
deleted = True
except Exception:
deleted = False
# If we couldn't bulk delete, delete them one by one
if not deleted:
try:
asyncio.gather(*[message.delete() for message in to_delete], return_exceptions=True)
except Exception:
pass
except (asyncio.TimeoutError, asyncio.CancelledError):
# Timed out waiting for the reaction, attempt to remove the delete reaction
try:
await react_msg.remove_reaction(emoji, ctx.client.user)
except Exception:
pass
except discord.Forbidden:
pass
except discord.NotFound:
pass
except discord.HTTPException:
pass
def context_property(func):
setattr(Context, func.__name__, property(func))
return func
@context_property
def best_prefix(ctx):
return ctx.client.prefix
@context_property
def guild_settings(ctx):
if ctx.guild:
tables.guild_config.fetch_or_create(ctx.guild.id)
return GuildSettings(ctx.guild.id if ctx.guild else 0)
@context_property
def author_settings(ctx):
return UserSettings(ctx.author.id)
@context_property
def alion(ctx):
return Lion.fetch(ctx.guild.id if ctx.guild else 0, ctx.author.id)

View File

@@ -1,461 +0,0 @@
import asyncio
import discord
from LionContext import LionContext as Context
from cmdClient.lib import UserCancelled, ResponseTimedOut
from .lib import paginate_list
# TODO: Interactive locks
cancel_emoji = ''
number_emojis = (
'1', '2', '3', '4', '5', '6', '7', '8', '9'
)
async def discord_shield(coro):
try:
await coro
except discord.HTTPException:
pass
@Context.util
async def cancellable(ctx, msg, add_reaction=True, cancel_message=None, timeout=300):
"""
Add a cancellation reaction to the given message.
Pressing the reaction triggers cancellation of the original context, and a UserCancelled-style error response.
"""
# TODO: Not consistent with the exception driven flow, make a decision here?
# Add reaction
if add_reaction and cancel_emoji not in (str(r.emoji) for r in msg.reactions):
try:
await msg.add_reaction(cancel_emoji)
except discord.HTTPException:
return
# Define cancellation function
async def _cancel():
try:
await ctx.client.wait_for(
'reaction_add',
timeout=timeout,
check=lambda r, u: (u == ctx.author
and r.message == msg
and str(r.emoji) == cancel_emoji)
)
except asyncio.TimeoutError:
pass
else:
await ctx.client.active_command_response_cleaner(ctx)
if cancel_message:
await ctx.error_reply(cancel_message)
else:
try:
await ctx.msg.add_reaction(cancel_emoji)
except discord.HTTPException:
pass
[task.cancel() for task in ctx.tasks]
# Launch cancellation task
task = asyncio.create_task(_cancel())
ctx.tasks.append(task)
return task
@Context.util
async def listen_for(ctx, allowed_input=None, timeout=120, lower=True, check=None):
"""
Listen for a one of a particular set of input strings,
sent in the current channel by `ctx.author`.
When found, return the message containing them.
Parameters
----------
allowed_input: Union(List(str), None)
List of strings to listen for.
Allowed to be `None` precisely when a `check` function is also supplied.
timeout: int
Number of seconds to wait before timing out.
lower: bool
Whether to shift the allowed and message strings to lowercase before checking.
check: Function(message) -> bool
Alternative custom check function.
Returns: discord.Message
The message that was matched.
Raises
------
cmdClient.lib.ResponseTimedOut:
Raised when no messages matching the given criteria are detected in `timeout` seconds.
"""
# Generate the check if it hasn't been provided
if not check:
# Quick check the arguments are sane
if not allowed_input:
raise ValueError("allowed_input and check cannot both be None")
# Force a lower on the allowed inputs
allowed_input = [s.lower() for s in allowed_input]
# Create the check function
def check(message):
result = (message.author == ctx.author)
result = result and (message.channel == ctx.ch)
result = result and ((message.content.lower() if lower else message.content) in allowed_input)
return result
# Wait for a matching message, catch and transform the timeout
try:
message = await ctx.client.wait_for('message', check=check, timeout=timeout)
except asyncio.TimeoutError:
raise ResponseTimedOut("Session timed out waiting for user response.") from None
return message
@Context.util
async def selector(ctx, header, select_from, timeout=120, max_len=20):
"""
Interactive routine to prompt the `ctx.author` to select an item from a list.
Returns the list index that was selected.
Parameters
----------
header: str
String to put at the top of each page of selection options.
Intended to be information about the list the user is selecting from.
select_from: List(str)
The list of strings to select from.
timeout: int
The number of seconds to wait before throwing `ResponseTimedOut`.
max_len: int
The maximum number of items to display on each page.
Decrease this if the items are long, to avoid going over the char limit.
Returns
-------
int:
The index of the list entry selected by the user.
Raises
------
cmdClient.lib.UserCancelled:
Raised if the user manually cancels the selection.
cmdClient.lib.ResponseTimedOut:
Raised if the user fails to respond to the selector within `timeout` seconds.
"""
# Handle improper arguments
if len(select_from) == 0:
raise ValueError("Selection list passed to `selector` cannot be empty.")
# Generate the selector pages
footer = "Please reply with the number of your selection, or press {} to cancel.".format(cancel_emoji)
list_pages = paginate_list(select_from, block_length=max_len)
pages = ["\n".join([header, page, footer]) for page in list_pages]
# Post the pages in a paged message
out_msg = await ctx.pager(pages, add_cancel=True)
cancel_task = await ctx.cancellable(out_msg, add_reaction=False, timeout=None)
if len(select_from) <= 5:
for i, _ in enumerate(select_from):
asyncio.create_task(discord_shield(out_msg.add_reaction(number_emojis[i])))
# Build response tasks
valid_input = [str(i+1) for i in range(0, len(select_from))] + ['c', 'C']
listen_task = asyncio.create_task(ctx.listen_for(valid_input, timeout=None))
emoji_task = asyncio.create_task(ctx.client.wait_for(
'reaction_add',
check=lambda r, u: (u == ctx.author
and r.message == out_msg
and str(r.emoji) in number_emojis)
))
# Wait for the response tasks
done, pending = await asyncio.wait(
(listen_task, emoji_task),
timeout=timeout,
return_when=asyncio.FIRST_COMPLETED
)
# Cleanup
try:
await out_msg.delete()
except discord.HTTPException:
pass
# Handle different return cases
if listen_task in done:
emoji_task.cancel()
result_msg = listen_task.result()
try:
await result_msg.delete()
except discord.HTTPException:
pass
if result_msg.content.lower() == 'c':
raise UserCancelled("Selection cancelled!")
result = int(result_msg.content) - 1
elif emoji_task in done:
listen_task.cancel()
reaction, _ = emoji_task.result()
result = number_emojis.index(str(reaction.emoji))
elif cancel_task in done:
# Manually cancelled case.. the current task should have been cancelled
# Raise UserCancelled in case the task wasn't cancelled for some reason
raise UserCancelled("Selection cancelled!")
elif not done:
# Timeout case
raise ResponseTimedOut("Selector timed out waiting for a response.")
# Finally cancel the canceller and return the provided index
cancel_task.cancel()
return result
@Context.util
async def pager(ctx, pages, locked=True, start_at=0, add_cancel=False, **kwargs):
"""
Shows the user each page from the provided list `pages` one at a time,
providing reactions to page back and forth between pages.
This is done asynchronously, and returns after displaying the first page.
Parameters
----------
pages: List(Union(str, discord.Embed))
A list of either strings or embeds to display as the pages.
locked: bool
Whether only the `ctx.author` should be able to use the paging reactions.
kwargs: ...
Remaining keyword arguments are transparently passed to the reply context method.
Returns: discord.Message
This is the output message, returned for easy deletion.
"""
# Handle broken input
if len(pages) == 0:
raise ValueError("Pager cannot page with no pages!")
# Post first page. Method depends on whether the page is an embed or not.
if isinstance(pages[start_at], discord.Embed):
out_msg = await ctx.reply(embed=pages[start_at], **kwargs)
else:
out_msg = await ctx.reply(pages[start_at], **kwargs)
# Run the paging loop if required
if len(pages) > 1:
task = asyncio.create_task(_pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs))
ctx.tasks.append(task)
elif add_cancel:
await out_msg.add_reaction(cancel_emoji)
# Return the output message
return out_msg
async def _pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs):
"""
Asynchronous initialiser and loop for the `pager` utility above.
"""
# Page number
page = start_at
# Add reactions to the output message
next_emoji = ""
prev_emoji = ""
try:
await out_msg.add_reaction(prev_emoji)
if add_cancel:
await out_msg.add_reaction(cancel_emoji)
await out_msg.add_reaction(next_emoji)
except discord.Forbidden:
# We don't have permission to add paging emojis
# Die as gracefully as we can
if ctx.guild:
perms = ctx.ch.permissions_for(ctx.guild.me)
if not perms.add_reactions:
await ctx.error_reply(
"Cannot page results because I do not have the `add_reactions` permission!"
)
elif not perms.read_message_history:
await ctx.error_reply(
"Cannot page results because I do not have the `read_message_history` permission!"
)
else:
await ctx.error_reply(
"Cannot page results due to insufficient permissions!"
)
else:
await ctx.error_reply(
"Cannot page results!"
)
return
# Check function to determine whether a reaction is valid
def reaction_check(reaction, user):
result = reaction.message.id == out_msg.id
result = result and str(reaction.emoji) in [next_emoji, prev_emoji]
result = result and not (user.id == ctx.client.user.id)
result = result and not (locked and user != ctx.author)
return result
# Check function to determine if message has a page number
def message_check(message):
result = message.channel.id == ctx.ch.id
result = result and not (locked and message.author != ctx.author)
result = result and message.content.lower().startswith('p')
result = result and message.content[1:].isdigit()
result = result and 1 <= int(message.content[1:]) <= len(pages)
return result
# Begin loop
while True:
# Wait for a valid reaction or message, break if we time out
reaction_task = asyncio.create_task(
ctx.client.wait_for('reaction_add', check=reaction_check)
)
message_task = asyncio.create_task(
ctx.client.wait_for('message', check=message_check)
)
done, pending = await asyncio.wait(
(reaction_task, message_task),
timeout=300,
return_when=asyncio.FIRST_COMPLETED
)
if done:
if reaction_task in done:
# Cancel the message task and collect the reaction result
message_task.cancel()
reaction, user = reaction_task.result()
# Attempt to remove the user's reaction, silently ignore errors
asyncio.ensure_future(out_msg.remove_reaction(reaction.emoji, user))
# Change the page number
page += 1 if reaction.emoji == next_emoji else -1
page %= len(pages)
elif message_task in done:
# Cancel the reaction task and collect the message result
reaction_task.cancel()
message = message_task.result()
# Attempt to delete the user's message, silently ignore errors
asyncio.ensure_future(message.delete())
# Move to the correct page
page = int(message.content[1:]) - 1
# Edit the message with the new page
active_page = pages[page]
if isinstance(active_page, discord.Embed):
await out_msg.edit(embed=active_page, **kwargs)
else:
await out_msg.edit(content=active_page, **kwargs)
else:
# No tasks finished, so we must have timed out, or had an exception.
# Break the loop and clean up
break
# Clean up by removing the reactions
try:
await out_msg.clear_reactions()
except discord.Forbidden:
try:
await out_msg.remove_reaction(next_emoji, ctx.client.user)
await out_msg.remove_reaction(prev_emoji, ctx.client.user)
except discord.NotFound:
pass
except discord.NotFound:
pass
@Context.util
async def input(ctx, msg="", timeout=120):
"""
Listen for a response in the current channel, from ctx.author.
Returns the response from ctx.author, if it is provided.
Parameters
----------
msg: string
Allows a custom input message to be provided.
Will use default message if not provided.
timeout: int
Number of seconds to wait before timing out.
Raises
------
cmdClient.lib.ResponseTimedOut:
Raised when ctx.author does not provide a response before the function times out.
"""
# Deliver prompt
offer_msg = await ctx.reply(msg or "Please enter your input.")
# Criteria for the input message
def checks(m):
return m.author == ctx.author and m.channel == ctx.ch
# Listen for the reply
try:
result_msg = await ctx.client.wait_for("message", check=checks, timeout=timeout)
except asyncio.TimeoutError:
raise ResponseTimedOut("Session timed out waiting for user response.") from None
result = result_msg.content
# Attempt to delete the prompt and reply messages
try:
await offer_msg.delete()
await result_msg.delete()
except Exception:
pass
return result
@Context.util
async def ask(ctx, msg, timeout=30, use_msg=None, del_on_timeout=False):
"""
Ask ctx.author a yes/no question.
Returns 0 if ctx.author answers no
Returns 1 if ctx.author answers yes
Parameters
----------
msg: string
Adds the question to the message string.
Requires an input.
timeout: int
Number of seconds to wait before timing out.
use_msg: string
A completely custom string to use instead of the default string.
del_on_timeout: bool
Whether to delete the question if it times out.
Raises
------
Nothing
"""
out = "{} {}".format(msg, "`y(es)`/`n(o)`")
offer_msg = use_msg or await ctx.reply(out)
if use_msg and msg:
await use_msg.edit(content=msg)
result_msg = await ctx.listen_for(["y", "yes", "n", "no"], timeout=timeout)
if result_msg is None:
if del_on_timeout:
try:
await offer_msg.delete()
except Exception:
pass
return None
result = result_msg.content.lower()
try:
if not use_msg:
await offer_msg.delete()
await result_msg.delete()
except Exception:
pass
if result in ["n", "no"]:
return 0
return 1

View File

@@ -1,553 +0,0 @@
import datetime
import iso8601
import re
from enum import Enum
import discord
from psycopg2.extensions import QuotedString
from cmdClient.lib import SafeCancellation
multiselect_regex = re.compile(
r"^([0-9, -]+)$",
re.DOTALL | re.IGNORECASE | re.VERBOSE
)
tick = ''
cross = ''
def prop_tabulate(prop_list, value_list, indent=True, colon=True):
"""
Turns a list of properties and corresponding list of values into
a pretty string with one `prop: value` pair each line,
padded so that the colons in each line are lined up.
Handles empty props by using an extra couple of spaces instead of a `:`.
Parameters
----------
prop_list: List[str]
List of short names to put on the right side of the list.
Empty props are considered to be "newlines" for the corresponding value.
value_list: List[str]
List of values corresponding to the properties above.
indent: bool
Whether to add padding so the properties are right-adjusted.
Returns: str
"""
max_len = max(len(prop) for prop in prop_list)
return "".join(["`{}{}{}`\t{}{}".format(" " * (max_len - len(prop)) if indent else "",
prop,
(":" if len(prop) else " " * 2) if colon else '',
value_list[i],
'' if str(value_list[i]).endswith("```") else '\n')
for i, prop in enumerate(prop_list)])
def paginate_list(item_list, block_length=20, style="markdown", title=None):
"""
Create pretty codeblock pages from a list of strings.
Parameters
----------
item_list: List[str]
List of strings to paginate.
block_length: int
Maximum number of strings per page.
style: str
Codeblock style to use.
Title formatting assumes the `markdown` style, and numbered lists work well with this.
However, `markdown` sometimes messes up formatting in the list.
title: str
Optional title to add to the top of each page.
Returns: List[str]
List of pages, each formatted into a codeblock,
and containing at most `block_length` of the provided strings.
"""
lines = ["{0:<5}{1:<5}".format("{}.".format(i + 1), str(line)) for i, line in enumerate(item_list)]
page_blocks = [lines[i:i + block_length] for i in range(0, len(lines), block_length)]
pages = []
for i, block in enumerate(page_blocks):
pagenum = "Page {}/{}".format(i + 1, len(page_blocks))
if title:
header = "{} ({})".format(title, pagenum) if len(page_blocks) > 1 else title
else:
header = pagenum
header_line = "=" * len(header)
full_header = "{}\n{}\n".format(header, header_line) if len(page_blocks) > 1 or title else ""
pages.append("```{}\n{}{}```".format(style, full_header, "\n".join(block)))
return pages
def timestamp_utcnow():
"""
Return the current integer UTC timestamp.
"""
return int(datetime.datetime.timestamp(datetime.datetime.utcnow()))
def split_text(text, blocksize=2000, code=True, syntax="", maxheight=50):
"""
Break the text into blocks of maximum length blocksize
If possible, break across nearby newlines. Otherwise just break at blocksize chars
Parameters
----------
text: str
Text to break into blocks.
blocksize: int
Maximum character length for each block.
code: bool
Whether to wrap each block in codeblocks (these are counted in the blocksize).
syntax: str
The markdown formatting language to use for the codeblocks, if applicable.
maxheight: int
The maximum number of lines in each block
Returns: List[str]
List of blocks,
each containing at most `block_size` characters,
of height at most `maxheight`.
"""
# Adjust blocksize to account for the codeblocks if required
blocksize = blocksize - 8 - len(syntax) if code else blocksize
# Build the blocks
blocks = []
while True:
# If the remaining text is already small enough, append it
if len(text) <= blocksize:
blocks.append(text)
break
text = text.strip('\n')
# Find the last newline in the prototype block
split_on = text[0:blocksize].rfind('\n')
split_on = blocksize if split_on < blocksize // 5 else split_on
# Add the block and truncate the text
blocks.append(text[0:split_on])
text = text[split_on:]
# Add the codeblock ticks and the code syntax header, if required
if code:
blocks = ["```{}\n{}\n```".format(syntax, block) for block in blocks]
return blocks
def strfdelta(delta, sec=False, minutes=True, short=False):
"""
Convert a datetime.timedelta object into an easily readable duration string.
Parameters
----------
delta: datetime.timedelta
The timedelta object to convert into a readable string.
sec: bool
Whether to include the seconds from the timedelta object in the string.
minutes: bool
Whether to include the minutes from the timedelta object in the string.
short: bool
Whether to abbreviate the units of time ("hour" to "h", "minute" to "m", "second" to "s").
Returns: str
A string containing a time from the datetime.timedelta object, in a readable format.
Time units will be abbreviated if short was set to True.
"""
output = [[delta.days, 'd' if short else ' day'],
[delta.seconds // 3600, 'h' if short else ' hour']]
if minutes:
output.append([delta.seconds // 60 % 60, 'm' if short else ' minute'])
if sec:
output.append([delta.seconds % 60, 's' if short else ' second'])
for i in range(len(output)):
if output[i][0] != 1 and not short:
output[i][1] += 's'
reply_msg = []
if output[0][0] != 0:
reply_msg.append("{}{} ".format(output[0][0], output[0][1]))
if output[0][0] != 0 or output[1][0] != 0 or len(output) == 2:
reply_msg.append("{}{} ".format(output[1][0], output[1][1]))
for i in range(2, len(output) - 1):
reply_msg.append("{}{} ".format(output[i][0], output[i][1]))
if not short and reply_msg:
reply_msg.append("and ")
reply_msg.append("{}{}".format(output[-1][0], output[-1][1]))
return "".join(reply_msg)
def parse_dur(time_str):
"""
Parses a user provided time duration string into a timedelta object.
Parameters
----------
time_str: str
The time string to parse. String can include days, hours, minutes, and seconds.
Returns: int
The number of seconds the duration represents.
"""
funcs = {'d': lambda x: x * 24 * 60 * 60,
'h': lambda x: x * 60 * 60,
'm': lambda x: x * 60,
's': lambda x: x}
time_str = time_str.strip(" ,")
found = re.findall(r'(\d+)\s?(\w+?)', time_str)
seconds = 0
for bit in found:
if bit[1] in funcs:
seconds += funcs[bit[1]](int(bit[0]))
return seconds
def strfdur(duration, short=True, show_days=False):
"""
Convert a duration given in seconds to a number of hours, minutes, and seconds.
"""
days = duration // (3600 * 24) if show_days else 0
hours = duration // 3600
if days:
hours %= 24
minutes = duration // 60 % 60
seconds = duration % 60
parts = []
if days:
unit = 'd' if short else (' days' if days != 1 else ' day')
parts.append('{}{}'.format(days, unit))
if hours:
unit = 'h' if short else (' hours' if hours != 1 else ' hour')
parts.append('{}{}'.format(hours, unit))
if minutes:
unit = 'm' if short else (' minutes' if minutes != 1 else ' minute')
parts.append('{}{}'.format(minutes, unit))
if seconds or duration == 0:
unit = 's' if short else (' seconds' if seconds != 1 else ' second')
parts.append('{}{}'.format(seconds, unit))
if short:
return ' '.join(parts)
else:
return ', '.join(parts)
def substitute_ranges(ranges_str, max_match=20, max_range=1000, separator=','):
"""
Substitutes a user provided list of numbers and ranges,
and replaces the ranges by the corresponding list of numbers.
Parameters
----------
ranges_str: str
The string to ranges in.
max_match: int
The maximum number of ranges to replace.
Any ranges exceeding this will be ignored.
max_range: int
The maximum length of range to replace.
Attempting to replace a range longer than this will raise a `ValueError`.
"""
def _repl(match):
n1 = int(match.group(1))
n2 = int(match.group(2))
if n2 - n1 > max_range:
raise SafeCancellation("Provided range is too large!")
return separator.join(str(i) for i in range(n1, n2 + 1))
return re.sub(r'(\d+)\s*-\s*(\d+)', _repl, ranges_str, max_match)
def parse_ranges(ranges_str, ignore_errors=False, separator=',', **kwargs):
"""
Parses a user provided range string into a list of numbers.
Extra keyword arguments are transparently passed to the underlying parser `substitute_ranges`.
"""
substituted = substitute_ranges(ranges_str, separator=separator, **kwargs)
numbers = (item.strip() for item in substituted.split(','))
numbers = [item for item in numbers if item]
integers = [int(item) for item in numbers if item.isdigit()]
if not ignore_errors and len(integers) != len(numbers):
raise SafeCancellation(
"Couldn't parse the provided selection!\n"
"Please provide comma separated numbers and ranges, e.g. `1, 5, 6-9`."
)
return integers
def msg_string(msg, mask_link=False, line_break=False, tz=None, clean=True):
"""
Format a message into a string with various information, such as:
the timestamp of the message, author, message content, and attachments.
Parameters
----------
msg: Message
The message to format.
mask_link: bool
Whether to mask the URLs of any attachments.
line_break: bool
Whether a line break should be used in the string.
tz: Timezone
The timezone to use in the formatted message.
clean: bool
Whether to use the clean content of the original message.
Returns: str
A formatted string containing various information:
User timezone, message author, message content, attachments
"""
timestr = "%I:%M %p, %d/%m/%Y"
if tz:
time = iso8601.parse_date(msg.timestamp.isoformat()).astimezone(tz).strftime(timestr)
else:
time = msg.timestamp.strftime(timestr)
user = str(msg.author)
attach_list = [attach["url"] for attach in msg.attachments if "url" in attach]
if mask_link:
attach_list = ["[Link]({})".format(url) for url in attach_list]
attachments = "\nAttachments: {}".format(", ".join(attach_list)) if attach_list else ""
return "`[{time}]` **{user}:** {line_break}{message} {attachments}".format(
time=time,
user=user,
line_break="\n" if line_break else "",
message=msg.clean_content if clean else msg.content,
attachments=attachments
)
def convdatestring(datestring):
"""
Convert a date string into a datetime.timedelta object.
Parameters
----------
datestring: str
The string to convert to a datetime.timedelta object.
Returns: datetime.timedelta
A datetime.timedelta object formed from the string provided.
"""
datestring = datestring.strip(' ,')
datearray = []
funcs = {'d': lambda x: x * 24 * 60 * 60,
'h': lambda x: x * 60 * 60,
'm': lambda x: x * 60,
's': lambda x: x}
currentnumber = ''
for char in datestring:
if char.isdigit():
currentnumber += char
else:
if currentnumber == '':
continue
datearray.append((int(currentnumber), char))
currentnumber = ''
seconds = 0
if currentnumber:
seconds += int(currentnumber)
for i in datearray:
if i[1] in funcs:
seconds += funcs[i[1]](i[0])
return datetime.timedelta(seconds=seconds)
class _rawChannel(discord.abc.Messageable):
"""
Raw messageable class representing an arbitrary channel,
not necessarially seen by the gateway.
"""
def __init__(self, state, id):
self._state = state
self.id = id
async def _get_channel(self):
return discord.Object(self.id)
async def mail(client: discord.Client, channelid: int, **msg_args):
"""
Mails a message to a channelid which may be invisible to the gateway.
Parameters:
client: discord.Client
The client to use for mailing.
Must at least have static authentication and have a valid `_connection`.
channelid: int
The channel id to mail to.
msg_args: Any
Message keyword arguments which are passed transparently to `_rawChannel.send(...)`.
"""
# Create the raw channel
channel = _rawChannel(client._connection, channelid)
return await channel.send(**msg_args)
def emb_add_fields(embed, emb_fields):
"""
Append embed fields to an embed.
Parameters
----------
embed: discord.Embed
The embed to add the field to.
emb_fields: tuple
The values to add to a field.
name: str
The name of the field.
value: str
The value of the field.
inline: bool
Whether the embed field should be inline or not.
"""
for field in emb_fields:
embed.add_field(name=str(field[0]), value=str(field[1]), inline=bool(field[2]))
def join_list(string, nfs=False):
"""
Join a list together, separated with commas, plus add "and" to the beginning of the last value.
Parameters
----------
string: list
The list to join together.
nfs: bool
(no fullstops)
Whether to exclude fullstops/periods from the output messages.
If not provided, fullstops will be appended to the output.
"""
if len(string) > 1:
return "{}{} and {}{}".format((", ").join(string[:-1]),
"," if len(string) > 2 else "", string[-1], "" if nfs else ".")
else:
return "{}{}".format("".join(string), "" if nfs else ".")
def format_activity(user):
"""
Format a user's activity string, depending on the type of activity.
Currently supported types are:
- Nothing
- Custom status
- Playing (with rich presence support)
- Streaming
- Listening (with rich presence support)
- Watching
- Unknown
Parameters
----------
user: discord.Member
The user to format the status of.
If the user has no activity, "Nothing" will be returned.
Returns: str
A formatted string with various information about the user's current activity like the name,
and any extra information about the activity (such as current song artists for Spotify)
"""
if not user.activity:
return "Nothing"
AT = user.activity.type
a = user.activity
if str(AT) == "ActivityType.custom":
return "Status: {}".format(a)
if str(AT) == "ActivityType.playing":
string = "Playing {}".format(a.name)
try:
string += " ({})".format(a.details)
except Exception:
pass
return string
if str(AT) == "ActivityType.streaming":
return "Streaming {}".format(a.name)
if str(AT) == "ActivityType.listening":
try:
string = "Listening to `{}`".format(a.title)
if len(a.artists) > 1:
string += " by {}".format(join_list(string=a.artists))
else:
string += " by **{}**".format(a.artist)
except Exception:
string = "Listening to `{}`".format(a.name)
return string
if str(AT) == "ActivityType.watching":
return "Watching `{}`".format(a.name)
if str(AT) == "ActivityType.unknown":
return "Unknown"
def shard_of(shard_count: int, guildid: int):
"""
Calculate the shard number of a given guild.
"""
return (guildid >> 22) % shard_count if shard_count and shard_count > 0 else 0
def jumpto(guildid: int, channeldid: int, messageid: int):
"""
Build a jump link for a message given its location.
"""
return 'https://discord.com/channels/{}/{}/{}'.format(
guildid,
channeldid,
messageid
)
class DotDict(dict):
"""
Dict-type allowing dot access to keys.
"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class FieldEnum(str, Enum):
"""
String enum with description conforming to the ISQLQuote protocol.
Allows processing by psycog
"""
def __new__(cls, value, desc):
obj = str.__new__(cls, value)
obj._value_ = value
obj.desc = desc
return obj
def __repr__(self):
return '<%s.%s>' % (self.__class__.__name__, self.name)
def __bool__(self):
return True
def __conform__(self, proto):
return QuotedString(self.value)
def utc_now():
"""
Return the current timezone-aware utc timestamp.
"""
return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
def multiple_replace(string, rep_dict):
if rep_dict:
pattern = re.compile(
"|".join([re.escape(k) for k in sorted(rep_dict, key=len, reverse=True)]),
flags=re.DOTALL
)
return pattern.sub(lambda x: str(rep_dict[x.group(0)]), string)
else:
return string

View File

@@ -1,92 +0,0 @@
import time
from cmdClient.lib import SafeCancellation
from cachetools import TTLCache
class BucketFull(Exception):
"""
Throw when a requested Bucket is already full
"""
pass
class BucketOverFull(BucketFull):
"""
Throw when a requested Bucket is overfull
"""
pass
class Bucket:
__slots__ = ('max_level', 'empty_time', 'leak_rate', '_level', '_last_checked', '_last_full')
def __init__(self, max_level, empty_time):
self.max_level = max_level
self.empty_time = empty_time
self.leak_rate = max_level / empty_time
self._level = 0
self._last_checked = time.time()
self._last_full = False
@property
def overfull(self):
self._leak()
return self._level > self.max_level
def _leak(self):
if self._level:
elapsed = time.time() - self._last_checked
self._level = max(0, self._level - (elapsed * self.leak_rate))
self._last_checked = time.time()
def request(self):
self._leak()
if self._level + 1 > self.max_level + 1:
raise BucketOverFull
elif self._level + 1 > self.max_level:
self._level += 1
if self._last_full:
raise BucketOverFull
else:
self._last_full = True
raise BucketFull
else:
self._last_full = False
self._level += 1
class RateLimit:
def __init__(self, max_level, empty_time, error=None, cache=TTLCache(1000, 60 * 60)):
self.max_level = max_level
self.empty_time = empty_time
self.error = error or "Too many requests, please slow down!"
self.buckets = cache
def request_for(self, key):
if not (bucket := self.buckets.get(key, None)):
bucket = self.buckets[key] = Bucket(self.max_level, self.empty_time)
try:
bucket.request()
except BucketOverFull:
raise SafeCancellation(details="Bucket overflow")
except BucketFull:
raise SafeCancellation(self.error, details="Bucket full")
def ward(self, member=True, key=None):
"""
Command ratelimit decorator.
"""
key = key or ((lambda ctx: (ctx.guild.id, ctx.author.id)) if member else (lambda ctx: ctx.author.id))
def decorator(func):
async def wrapper(ctx, *args, **kwargs):
self.request_for(key(ctx))
return await func(ctx, *args, **kwargs)
return wrapper
return decorator

View File

@@ -1,427 +0,0 @@
import asyncio
import discord
from LionContext import LionContext as Context
from cmdClient.lib import InvalidContext, UserCancelled, ResponseTimedOut, SafeCancellation
from . import interactive as _interactive
@Context.util
async def find_role(ctx, userstr, create=False, interactive=False, collection=None, allow_notfound=True):
"""
Find a guild role given a partial matching string,
allowing custom role collections and several behavioural switches.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a role in the collection.
The string will be tested against both the id and the name of the role.
create: bool
Whether to offer to create the role if it does not exist.
The bot will only offer to create the role if it has the `manage_channels` permission.
interactive: bool
Whether to offer the user a list of roles to choose from,
or pick the first matching role.
collection: List[Union[discord.Role, discord.Object]]
Collection of roles to search amongst.
If none, uses the guild role list.
allow_notfound: bool
Whether to return `None` when there are no matches, instead of raising `SafeCancellation`.
Overriden by `create`, if it is set.
Returns
-------
discord.Role:
If a valid role is found.
None:
If no valid role has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive role selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive role selection within `60` seconds`
cmdClient.lib.SafeCancellation:
If `allow_notfound` is `False`, and the search returned no matches.
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_role outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_role was empty.")
# Create the collection to search from args or guild roles
collection = collection if collection is not None else ctx.guild.roles
# If the unser input was a number or possible role mention, get it out
userstr = userstr.strip()
roleid = userstr.strip('<#@&!> ')
roleid = int(roleid) if roleid.isdigit() else None
searchstr = userstr.lower()
# Find the role
role = None
# Check method to determine whether a role matches
def check(role):
return (role.id == roleid) or (searchstr in role.name.lower())
# Get list of matching roles
roles = list(filter(check, collection))
if len(roles) == 0:
# Nope
role = None
elif len(roles) == 1:
# Select our lucky winner
role = roles[0]
else:
# We have multiple matching roles!
if interactive:
# Interactive prompt with the list of roles, handle `Object`s
role_names = [
role.name if isinstance(role, discord.Role) else str(role.id) for role in roles
]
try:
selected = await ctx.selector(
"`{}` roles found matching `{}`!".format(len(roles), userstr),
role_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled role selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Role selection timed out.") from None
role = roles[selected]
else:
# Just select the first one
role = roles[0]
# Handle non-existence of the role
if role is None:
msgstr = "Couldn't find a role matching `{}`!".format(userstr)
if create:
# Inform the user
msg = await ctx.error_reply(msgstr)
if ctx.guild.me.guild_permissions.manage_roles:
# Offer to create it
resp = await ctx.ask("Would you like to create this role?", timeout=30)
if resp:
# They accepted, create the role
# Before creation, check if the role name is too long
if len(userstr) > 100:
await ctx.error_reply("Could not create a role with a name over 100 characters long!")
else:
role = await ctx.guild.create_role(
name=userstr,
reason="Interactive role creation for {} (uid:{})".format(ctx.author, ctx.author.id)
)
await msg.delete()
await ctx.reply("You have created the role `{}`!".format(userstr))
# If we still don't have a role, cancel unless allow_notfound is set
if role is None and not allow_notfound:
raise SafeCancellation
elif not allow_notfound:
raise SafeCancellation(msgstr)
else:
await ctx.error_reply(msgstr)
return role
@Context.util
async def find_channel(ctx, userstr, interactive=False, collection=None, chan_type=None, type_name=None):
"""
Find a guild channel given a partial matching string,
allowing custom channel collections and several behavioural switches.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a channel in the collection.
The string will be tested against both the id and the name of the channel.
interactive: bool
Whether to offer the user a list of channels to choose from,
or pick the first matching channel.
collection: List(discord.Channel)
Collection of channels to search amongst.
If none, uses the full guild channel list.
chan_type: discord.ChannelType
Type of channel to restrict the collection to.
type_name: str
Optional name to use for the channel type if it is not found.
Used particularly with custom collections.
Returns
-------
discord.Channel:
If a valid channel is found.
None:
If no valid channel has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive channel selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive channel selection within `60` seconds`
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_channel outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_channel was empty.")
# Create the collection to search from args or guild channels
collection = collection if collection else ctx.guild.channels
if chan_type is not None:
if chan_type == discord.ChannelType.text:
# Hack to support news channels as text channels
collection = [chan for chan in collection if isinstance(chan, discord.TextChannel)]
else:
collection = [chan for chan in collection if chan.type == chan_type]
# If the user input was a number or possible channel mention, extract it
chanid = userstr.strip('<#@&!>')
chanid = int(chanid) if chanid.isdigit() else None
searchstr = userstr.lower()
# Find the channel
chan = None
# Check method to determine whether a channel matches
def check(chan):
return (chan.id == chanid) or (searchstr in chan.name.lower())
# Get list of matching roles
channels = list(filter(check, collection))
if len(channels) == 0:
# Nope
chan = None
elif len(channels) == 1:
# Select our lucky winner
chan = channels[0]
else:
# We have multiple matching channels!
if interactive:
# Interactive prompt with the list of channels
chan_names = [chan.name for chan in channels]
try:
selected = await ctx.selector(
"`{}` channels found matching `{}`!".format(len(channels), userstr),
chan_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled channel selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Channel selection timed out.") from None
chan = channels[selected]
else:
# Just select the first one
chan = channels[0]
if chan is None:
typestr = type_name
addendum = ""
if chan_type and not type_name:
chan_type_strings = {
discord.ChannelType.category: "category",
discord.ChannelType.text: "text channel",
discord.ChannelType.voice: "voice channel",
discord.ChannelType.stage_voice: "stage channel",
}
typestr = chan_type_strings.get(chan_type, None)
if typestr and chanid:
actual = ctx.guild.get_channel(chanid)
if actual and actual.type in chan_type_strings:
addendum = "\n{} appears to be a {} instead.".format(
actual.mention,
chan_type_strings[actual.type]
)
typestr = typestr or "channel"
await ctx.error_reply("Couldn't find a {} matching `{}`!{}".format(typestr, userstr, addendum))
return chan
@Context.util
async def find_member(ctx, userstr, interactive=False, collection=None, silent=False):
"""
Find a guild member given a partial matching string,
allowing custom member collections.
Parameters
----------
userstr: str
String obtained from a user, expected to partially match a member in the collection.
The string will be tested against both the userid, full user name and user nickname.
interactive: bool
Whether to offer the user a list of members to choose from,
or pick the first matching channel.
collection: List(discord.Member)
Collection of members to search amongst.
If none, uses the full guild member list.
silent: bool
Whether to reply with an error when there are no matches.
Returns
-------
discord.Member:
If a valid member is found.
None:
If no valid member has been found.
Raises
------
cmdClient.lib.UserCancelled:
If the user cancels interactive member selection.
cmdClient.lib.ResponseTimedOut:
If the user fails to respond to interactive member selection within `60` seconds`
"""
# Handle invalid situations and input
if not ctx.guild:
raise InvalidContext("Attempt to use find_member outside of a guild.")
if userstr == "":
raise ValueError("User string passed to find_member was empty.")
# Create the collection to search from args or guild members
collection = collection if collection else ctx.guild.members
# If the user input was a number or possible member mention, extract it
userid = userstr.strip('<#@&!>')
userid = int(userid) if userid.isdigit() else None
searchstr = userstr.lower()
# Find the member
member = None
# Check method to determine whether a member matches
def check(member):
return (
member.id == userid
or searchstr in member.display_name.lower()
or searchstr in str(member).lower()
)
# Get list of matching roles
members = list(filter(check, collection))
if len(members) == 0:
# Nope
member = None
elif len(members) == 1:
# Select our lucky winner
member = members[0]
else:
# We have multiple matching members!
if interactive:
# Interactive prompt with the list of members
member_names = [
"{} {}".format(
member.nick if member.nick else (member if members.count(member) > 1
else member.name),
("<{}>".format(member)) if member.nick else ""
) for member in members
]
try:
selected = await ctx.selector(
"`{}` members found matching `{}`!".format(len(members), userstr),
member_names,
timeout=60
)
except UserCancelled:
raise UserCancelled("User cancelled member selection.") from None
except ResponseTimedOut:
raise ResponseTimedOut("Member selection timed out.") from None
member = members[selected]
else:
# Just select the first one
member = members[0]
if member is None and not silent:
await ctx.error_reply("Couldn't find a member matching `{}`!".format(userstr))
return member
@Context.util
async def find_message(ctx, msgid, chlist=None, ignore=[]):
"""
Searches for the given message id in the guild channels.
Parameters
-------
msgid: int
The `id` of the message to search for.
chlist: Optional[List[discord.TextChannel]]
List of channels to search in.
If `None`, searches all the text channels that the `ctx.author` can read.
ignore: list
A list of channelids to explicitly ignore in the search.
Returns
-------
Optional[discord.Message]:
If a message is found, returns the message.
Otherwise, returns `None`.
"""
if not ctx.guild:
raise InvalidContext("Cannot use this seeker outside of a guild!")
msgid = int(msgid)
# Build the channel list to search
if chlist is None:
chlist = [ch for ch in ctx.guild.text_channels if ch.permissions_for(ctx.author).read_messages]
# Remove any channels we are ignoring
chlist = [ch for ch in chlist if ch.id not in ignore]
tasks = set()
i = 0
while True:
done = set((task for task in tasks if task.done()))
tasks = tasks.difference(done)
results = [task.result() for task in done]
result = next((result for result in results if result is not None), None)
if result:
[task.cancel() for task in tasks]
return result
if i < len(chlist):
task = asyncio.create_task(_search_in_channel(chlist[i], msgid))
tasks.add(task)
i += 1
elif len(tasks) == 0:
return None
await asyncio.sleep(0.1)
async def _search_in_channel(channel: discord.TextChannel, msgid: int):
if not isinstance(channel, discord.TextChannel):
return
try:
message = await channel.fetch_message(msgid)
except Exception:
return None
else:
return message