import asyncio import builtins import sys import ast import inspect import traceback from io import StringIO import types from typing import Any, Callable import twitchio from twitchio.ext import commands as cmds from meta import Bot, Context, conf from meta.logger import log_wrap from . import logger def mk_print(fp: StringIO) -> Callable[..., None]: def _print(*args, file: Any = fp, **kwargs): return print(*args, file=file, **kwargs) return _print class ExecComponent(cmds.Component): def __init__(self, bot: Bot): self.bot = bot @log_wrap(action="Code Exec") async def _async(self, code: str, **kwargs): logger.info(f"Running code from exec: {code}") output = StringIO() printer = mk_print(output) scope: dict[str, Any] = dict(sys.modules) scope["__builtins__"] = builtins scope.update(builtins.__dict__) scope.update(kwargs) scope["bot"] = self.bot scope["print"] = printer try: compiled = compile( code, "Code Async", "exec", ast.PyCF_ALLOW_TOP_LEVEL_AWAIT ) func = types.FunctionType(compiled, scope) ret = func() if inspect.iscoroutine(ret): ret = await ret if ret is not None: printer(repr(ret)) except Exception: _, exc, tb = sys.exc_info() # printer("".join(traceback.format_tb(tb))) printer(f"{type(exc).__name__}: {exc}") result = output.getvalue().strip() logger.info(f"Exec complete, output: {result}") return result @cmds.command(name="async") async def cmd_async(self, ctx: Context, *, args: str): """ Execute some code inside the bot context """ ownerid = conf.bot.get("owner_id").strip() if not (ownerid and ctx.author.id == ownerid): await ctx.reply("Sorry, you don't have sufficient permissions for this.") return result = await self._async(args, ctx=ctx) if len(result) > 500: # TODO: Upload to a pastebin await ctx.reply( "Output too long to display! Please check the logs. (Pastebin coming soon.)" ) elif result: await ctx.reply(result) else: await ctx.reply("Exec complete with no output.")