From e3d6e21d83d65e5b2eff75af295f1b0a8ef50877 Mon Sep 17 00:00:00 2001 From: Interitio Date: Mon, 25 Aug 2025 14:45:34 +1000 Subject: [PATCH] Add exec cog. --- src/modules/__init__.py | 1 + src/modules/sysadmin/__init__.py | 5 + src/modules/sysadmin/exec_cog.py | 336 +++++++++++++++++++++++++++++++ 3 files changed, 342 insertions(+) create mode 100644 src/modules/sysadmin/__init__.py create mode 100644 src/modules/sysadmin/exec_cog.py diff --git a/src/modules/__init__.py b/src/modules/__init__.py index 07e226e..3f6887e 100644 --- a/src/modules/__init__.py +++ b/src/modules/__init__.py @@ -1,6 +1,7 @@ this_package = 'modules' active = [ + '.sysadmin', ] diff --git a/src/modules/sysadmin/__init__.py b/src/modules/sysadmin/__init__.py new file mode 100644 index 0000000..4a96ce6 --- /dev/null +++ b/src/modules/sysadmin/__init__.py @@ -0,0 +1,5 @@ + +async def setup(bot): + from .exec_cog import Exec + + await bot.add_cog(Exec(bot)) diff --git a/src/modules/sysadmin/exec_cog.py b/src/modules/sysadmin/exec_cog.py new file mode 100644 index 0000000..776292a --- /dev/null +++ b/src/modules/sysadmin/exec_cog.py @@ -0,0 +1,336 @@ +import io +import ast +import sys +import types +import asyncio +import traceback +import builtins +import inspect +import logging +from io import StringIO + +from typing import Callable, Any, Optional + +from enum import Enum + +import discord +from discord.ext import commands +from discord.ext.commands.errors import CheckFailure +from discord.ui import TextInput, View +from discord.ui.button import button +import discord.app_commands as appcmd + +from meta.logger import logging_context, log_wrap +from meta import conf +from meta.context import context, ctx_bot +from meta.LionContext import LionContext +from meta.LionCog import LionCog +from meta.LionBot import LionBot + +from utils.ui import FastModal, input + +from wards import sys_admin + + +def _(arg): return arg + +def _p(ctx, arg): return arg + + +logger = logging.getLogger(__name__) + +class ExecModal(FastModal, title="Execute"): + code: TextInput = TextInput( + label="Code to execute", + style=discord.TextStyle.long, + required=True + ) + + +class ExecStyle(Enum): + EXEC = 'exec' + EVAL = 'eval' + + +class ExecUI(View): + def __init__(self, ctx, code=None, style=ExecStyle.EXEC, ephemeral=True) -> None: + super().__init__() + + self.ctx: LionContext = ctx + self.interaction: Optional[discord.Interaction] = ctx.interaction + self.code: Optional[str] = code + self.style: ExecStyle = style + self.ephemeral: bool = ephemeral + + self._modal: Optional[ExecModal] = None + self._msg: Optional[discord.Message] = None + + async def interaction_check(self, interaction: discord.Interaction): + """Only allow the original author to use this View""" + if interaction.user.id != self.ctx.author.id: + await interaction.response.send_message( + ("You cannot use this interface!"), + ephemeral=True + ) + return False + else: + return True + + async def run(self): + if self.code is None: + if (interaction := self.interaction) is not None: + self.interaction = None + await interaction.response.send_modal(self.get_modal()) + await self.wait() + else: + # Complain + # TODO: error_reply + await self.ctx.reply("Pls give code.") + else: + await self.interaction.response.defer(thinking=True, ephemeral=self.ephemeral) + await self.compile() + await self.wait() + + @button(label="Recompile") + async def recompile_button(self, interaction, butt): + # Interaction response with modal + await interaction.response.send_modal(self.get_modal()) + + @button(label="Show Source") + async def source_button(self, interaction, butt): + if len(self.code) > 1900: + # Send as file + with StringIO(self.code) as fp: + fp.seek(0) + file = discord.File(fp, filename="source.py") + await interaction.response.send_message(file=file, ephemeral=True) + else: + # Send as message + await interaction.response.send_message( + content=f"```py\n{self.code}```", + ephemeral=True + ) + + def create_modal(self) -> ExecModal: + modal = ExecModal() + + @modal.submit_callback() + async def exec_submit(interaction: discord.Interaction): + if self.interaction is None: + self.interaction = interaction + await interaction.response.defer(thinking=True) + else: + await interaction.response.defer() + + # Set code + self.code = modal.code.value + + # Call compile + await self.compile() + + return modal + + def get_modal(self): + self._modal = self.create_modal() + self._modal.code.default = self.code + return self._modal + + async def compile(self): + # Call _async + result = await _async(self.code, style=self.style.value) + + # Display output + await self.show_output(result) + + async def show_output(self, output): + # Format output + # If output message exists and not ephemeral, edit + # Otherwise, send message, add buttons + if len(output) > 1900: + # Send as file + with StringIO(output) as fp: + fp.seek(0) + args = { + 'content': None, + 'attachments': [discord.File(fp, filename="output.md")] + } + else: + args = { + 'content': f"```md\n{output}```", + 'attachments': [] + } + + if self._msg is None: + if self.interaction is not None: + msg = await self.interaction.edit_original_response(**args, view=self) + else: + # Send new message + if args['content'] is None: + args['file'] = args.pop('attachments')[0] + msg = await self.ctx.reply(**args, ephemeral=self.ephemeral, view=self) + + if not self.ephemeral: + self._msg = msg + else: + if self.interaction is not None: + await self.interaction.edit_original_response(**args, view=self) + else: + # Edit message + await self._msg.edit(**args) + + +def mk_print(fp: io.StringIO) -> Callable[..., None]: + def _print(*args, file: Any = fp, **kwargs): + return print(*args, file=file, **kwargs) + return _print + + +def mk_status_printer(bot, printer): + async def _status(details=False): + if details: + status = await bot.system_monitor.get_overview() + else: + status = await bot.system_monitor.get_summary() + printer(status) + return status + return _status + + +@log_wrap(action="Code Exec") +async def _async(to_eval: str, style='exec'): + newline = '\n' * ('\n' in to_eval) + logger.info( + f"Exec code with {style}: {newline}{to_eval}" + ) + + output = io.StringIO() + _print = mk_print(output) + + scope: dict[str, Any] = dict(sys.modules) + scope['__builtins__'] = builtins + scope.update(builtins.__dict__) + scope['ctx'] = ctx = context.get() + scope['bot'] = ctx_bot.get() + scope['print'] = _print # type: ignore + scope['print_status'] = mk_status_printer(scope['bot'], _print) + + try: + if ctx and ctx.message: + source_str = f"" + elif ctx and ctx.interaction: + source_str = f"" + else: + source_str = "Unknown async" + + code = compile( + to_eval, + source_str, + style, + ast.PyCF_ALLOW_TOP_LEVEL_AWAIT + ) + func = types.FunctionType(code, scope) + + ret = func() + if inspect.iscoroutine(ret): + ret = await ret + if ret is not None: + _print(repr(ret)) + except Exception: + _, exc, tb = sys.exc_info() + _print("".join(traceback.format_tb(tb))) + _print(f"{type(exc).__name__}: {exc}") + + result = output.getvalue().strip() + newline = '\n' * ('\n' in result) + logger.info( + f"Exec complete, output: {newline}{result}" + ) + return result + + +class Exec(LionCog): + guild_ids = conf.bot.getintlist('admin_guilds') + + def __init__(self, bot: LionBot): + self.bot = bot + + async def cog_check(self, ctx: LionContext) -> bool: # type: ignore + passed = await sys_admin(ctx.bot, ctx.author.id) + if passed: + return True + else: + raise CheckFailure( + "You must be a bot owner to do this!" + ) + + @commands.hybrid_command( + name=_('async'), + description=_("Execute arbitrary code with Exec") + ) + @appcmd.describe( + string="Code to execute.", + ) + async def async_cmd(self, ctx: LionContext, + string: Optional[str] = None, + ): + await ExecUI(ctx, string, ExecStyle.EXEC, ephemeral=False).run() + + @commands.hybrid_command( + name=_('reload'), + description=_("Reload a given LionBot extension. Launches an ExecUI.") + ) + @appcmd.describe( + extension=_("Name of the extension to reload. See autocomplete for options."), + force=_("Whether to force an extension reload even if it doesn't exist.") + ) + @appcmd.guilds(*guild_ids) + async def reload_cmd(self, ctx: LionContext, extension: str, force: Optional[bool] = False): + """ + This is essentially just a friendly wrapper to reload an extension. + It is equivalent to running "await bot.reload_extension(extension)" in eval, + with a slightly nicer interface through the autocomplete and error handling. + """ + exists = (extension in self.bot.extensions) + if not (force or exists): + embed = discord.Embed(description=f"Unknown extension {extension}", colour=discord.Colour.red()) + await ctx.reply(embed=embed) + else: + # Uses an ExecUI to simplify error handling and re-execution + if exists: + string = f"await bot.reload_extension('{extension}')" + else: + string = f"await bot.load_extension('{extension}')" + await ExecUI(ctx, string, ExecStyle.EVAL).run() + + @reload_cmd.autocomplete('extension') + async def reload_extension_acmpl(self, interaction: discord.Interaction, partial: str): + keys = set(self.bot.extensions.keys()) + results = [ + appcmd.Choice(name=key, value=key) + for key in keys + if partial.lower() in key.lower() + ] + if not results: + results = [ + appcmd.Choice(name=f"No extensions found matching {partial}", value="None") + ] + return results[:25] + + @commands.hybrid_command( + name=_('shutdown'), + description=_("Shutdown (or restart) the client.") + ) + @appcmd.guilds(*guild_ids) + async def shutdown_cmd(self, ctx: LionContext): + """ + Shutdown the client. + Maybe do something friendly here? + """ + logger.info("Shutting down on admin request.") + await ctx.reply( + embed=discord.Embed( + description=f"Understood {ctx.author.mention}, cleaning up and shutting down!", + colour=discord.Colour.orange() + ) + ) + await self.bot.close()