(LCtx): Refactor utility wrapping.

This commit is contained in:
2022-01-20 07:48:13 +02:00
parent 6f8748a722
commit ef81ab7afe
3 changed files with 91 additions and 88 deletions

View File

@@ -1,48 +1,87 @@
import datetime import types
import discord
from cmdClient import Context from cmdClient import Context
from cmdClient.logger import log from cmdClient.logger import log
reply_callbacks: list = [] # TODO Extend to all cmdClient.Context.Utils to give flexibility to modules
class LionContext(Context): class LionContext(Context):
""" """
Subclass to allow easy attachment of custom hooks and structure to contexts. Subclass to allow easy attachment of custom hooks and structure to contexts.
""" """
@classmethod
def __init__(self, client, **kwargs): def util(cls, util_func):
super().__init__(client, **kwargs) """
Decorator to make a utility function available as a Context instance method.
Extends the default Context method to add logging and to return the utility function.
"""
super().util(util_func)
log(f"Attached context utility function: {util_func.__name__}")
return util_func
@classmethod @classmethod
def util(self, util_func): def wrappable_util(cls, util_func):
""" """
Decorator to make a utility function available as a Context instance method Decorator to add a Wrappable utility function as a Context instance method.
""" """
log('added util_function: ' + util_func.__name__) wrappable = Wrappable(util_func)
super().util(wrappable)
def util_fun_wrapper(*args, **kwargs): log(f"Attached wrappable context utility function: {util_func.__name__}")
[args, kwargs] = self.util_pre(util_func, *args, **kwargs) return wrappable
return util_func(*args, **kwargs)
util_fun_wrapper.__name__ = util_func.__name__ # Hack
super().util(util_fun_wrapper)
@classmethod
def util_pre(self, util_func, *args, **kwargs):
if util_func.__name__ == 'reply':
for cb in reply_callbacks:
[args, kwargs] = cb(util_func, *args, **kwargs) # Nesting handlers. Note: args and kwargs are mutable
return [args, kwargs]
def register_reply_callback(func): class Wrappable:
reply_callbacks.append(func) __slots = ('_func', 'wrappers')
def unregister_reply_callback(func): def __init__(self, func):
reply_callbacks.remove(func) self._func = func
self.wrappers = None
@property
def __name__(self):
return self._func.__name__
def add_wrapper(self, func, name=None):
self.wrappers = self.wrappers or {}
name = name or func.__name__
self.wrappers[name] = func
log(
f"Added wrapper '{name}' to Wrappable '{self._func.__name__}'.",
context="Wrapping"
)
def remove_wrapper(self, name):
if not self.wrappers or name not in self.wrappers:
raise ValueError(
f"Cannot remove non-existent wrapper '{name}' from Wrappable '{self._func.__name__}'"
)
self.wrappers.pop(name)
log(
f"Removed wrapper '{name}' from Wrappable '{self._func.__name__}'.",
context="Wrapping"
)
def __call__(self, *args, **kwargs):
print(args, kwargs)
if self.wrappers:
return self._wrapped(iter(self.wrappers.values()))(*args, **kwargs)
else:
return self._func(*args, **kwargs)
def _wrapped(self, iter_wraps):
next_wrap = next(iter_wraps, None)
if next_wrap:
def _func(*args, **kwargs):
return next_wrap(self._wrapped(iter_wraps), *args, **kwargs)
else:
_func = self._func
return _func
def __get__(self, instance, cls=None):
if instance is None:
return self
else:
return types.MethodType(self, instance)
# Override the original Context.reply with a wrappable utility
reply = LionContext.wrappable_util(Context.reply)

View File

@@ -1,5 +1,5 @@
from LionModule import LionModule from LionModule import LionModule
from LionContext import register_reply_callback, unregister_reply_callback from LionContext import LionContext
from core.lion import Lion from core.lion import Lion
from .utils import get_last_voted_timestamp, lion_loveemote, lion_yayemote from .utils import get_last_voted_timestamp, lion_loveemote, lion_yayemote
@@ -11,32 +11,33 @@ upvote_info = "You have a boost available {}, to support our project and earn **
@module.launch_task @module.launch_task
async def register_hook(client): async def attach_topgg_webhook(client):
if client.shard_id == 0:
init_webhook() init_webhook()
register_reply_callback(reply) client.log("Attached top.gg voiting webhook.", context="TOPGG")
@module.launch_task
async def register_hook(client):
LionContext.reply.add_wrapper(topgg_reply_wrapper)
Lion.register_economy_bonus(economy_bonus) Lion.register_economy_bonus(economy_bonus)
client.log("Registered LionContext reply util hook.", context="Topgg") client.log("Loaded top.gg hooks.", context="TOPGG")
@module.unload_task @module.unload_task
async def unregister_hook(client): async def unregister_hook(client):
unregister_reply_callback(reply)
Lion.unregister_economy_bonus(economy_bonus) Lion.unregister_economy_bonus(economy_bonus)
LionContext.reply.remove_wrapper(topgg_reply_wrapper.__name__)
client.log("Unregistered LionContext reply util hook.", context="Topgg") client.log("Unloaded top.gg hooks.", context="TOPGG")
def reply(util_func, *args, **kwargs): async def topgg_reply_wrapper(func, *args, suggest_vote=True, **kwargs):
# *args will have LionContext if suggest_vote and not get_last_voted_timestamp(args[0].author.id):
# **kwargs should have the actual reply() call's extra arguments
if not get_last_voted_timestamp(args[0].author.id):
args = list(args)
upvote_info_formatted = upvote_info.format(lion_yayemote, args[0].best_prefix, lion_loveemote) upvote_info_formatted = upvote_info.format(lion_yayemote, args[0].best_prefix, lion_loveemote)
if 'embed' in kwargs: if 'embed' in kwargs:
# Add message as an extra embed field
kwargs['embed'].add_field( kwargs['embed'].add_field(
name="\u200b", name="\u200b",
value=( value=(
@@ -44,16 +45,17 @@ def reply(util_func, *args, **kwargs):
), ),
inline=False inline=False
) )
elif 'content' in args and args['content'] and len(args['content']) + len(upvote_info_formatted) < 1998: else:
args['content'] += '\n\n' + upvote_info_formatted # Add message to content
if 'content' in kwargs and kwargs['content'] and len(kwargs['content']) + len(upvote_info_formatted) < 1998:
kwargs['content'] += '\n\n' + upvote_info_formatted
elif len(args) > 1 and len(args[1]) + len(upvote_info_formatted) < 1998: elif len(args) > 1 and len(args[1]) + len(upvote_info_formatted) < 1998:
args = list(args)
args[1] += '\n\n' + upvote_info_formatted args[1] += '\n\n' + upvote_info_formatted
else: else:
args['content'] = '\n\n' + upvote_info_formatted kwargs['content'] = upvote_info_formatted
args = tuple(args) return await func(*args, **kwargs)
return [args, kwargs]
def economy_bonus(lion): def economy_bonus(lion):

View File

@@ -461,41 +461,3 @@ async def ask(ctx, msg, timeout=30, use_msg=None, del_on_timeout=False):
if result in ["n", "no"]: if result in ["n", "no"]:
return 0 return 0
return 1 return 1
# this reply() will be overide baseContext's reply with LionContext's, whcih can
# hook pre_execution of any util.
# Using this system, Module now have much power to change Context's utils
@LionContext.util
async def reply(ctx, content=None, allow_everyone=False, **kwargs):
"""
Helper function to reply in the current channel.
"""
if not allow_everyone:
if content:
content = lib.sterilise_content(content)
message = await ctx.ch.send(content=content, **kwargs)
ctx.sent_messages.append(message)
return message
# this reply() will be overide baseContext's reply
@LionContext.util
async def error_reply(ctx, error_str):
"""
Notify the user of a user level error.
Typically, this will occur in a red embed, posted in the command channel.
"""
embed = discord.Embed(
colour=discord.Colour.red(),
description=error_str,
timestamp=datetime.datetime.utcnow()
)
try:
message = await ctx.ch.send(embed=embed)
ctx.sent_messages.append(message)
return message
except discord.Forbidden:
message = await ctx.reply(error_str)
ctx.sent_messages.append(message)
return message