diff --git a/__init__.py b/__init__.py index e69de29..48aad58 100644 --- a/__init__.py +++ b/__init__.py @@ -0,0 +1 @@ +from .plugin import * diff --git a/plugin/__init__.py b/plugin/__init__.py index e69de29..e7db9c1 100644 --- a/plugin/__init__.py +++ b/plugin/__init__.py @@ -0,0 +1,7 @@ +import logging + +logger = logging.getLogger(__name__) + +from .twitch import setup as twitch_setup + +__all__ = ("twitch_setup",) diff --git a/plugin/twitch/__init__.py b/plugin/twitch/__init__.py index e69de29..b17d734 100644 --- a/plugin/twitch/__init__.py +++ b/plugin/twitch/__init__.py @@ -0,0 +1,7 @@ +from .. import logger + + +async def setup(bot): + from .component import ExecComponent + + await bot.add_component(ExecComponent(bot)) diff --git a/plugin/twitch/component.py b/plugin/twitch/component.py new file mode 100644 index 0000000..5b88e18 --- /dev/null +++ b/plugin/twitch/component.py @@ -0,0 +1,84 @@ +import asyncio +import builtins +import sys +import ast +import inspect +import traceback + +from io import StringIO +import types +from typing import Any, Callable + +import twitchio +from twitchio.ext import commands as cmds + +from meta import Bot, Context, conf +from meta.logger import log_wrap + +from . import logger + + +def mk_print(fp: StringIO) -> Callable[..., None]: + def _print(*args, file: Any = fp, **kwargs): + return print(*args, file=file, **kwargs) + + return _print + + +class ExecComponent(cmds.Component): + def __init__(self, bot: Bot): + self.bot = bot + + @log_wrap(action="Code Exec") + async def _async(self, code: str, **kwargs): + logger.info(f"Running code from exec: {code}") + output = StringIO() + printer = mk_print(output) + + scope: dict[str, Any] = dict(sys.modules) + scope["__builtins__"] = builtins + scope.update(builtins.__dict__) + scope.update(kwargs) + scope["bot"] = self.bot + scope["print"] = printer + + try: + compiled = compile( + code, "Code Async", "exec", ast.PyCF_ALLOW_TOP_LEVEL_AWAIT + ) + func = types.FunctionType(compiled, scope) + + ret = func() + if inspect.iscoroutine(ret): + ret = await ret + if ret is not None: + printer(repr(ret)) + except Exception: + _, exc, tb = sys.exc_info() + # printer("".join(traceback.format_tb(tb))) + printer(f"{type(exc).__name__}: {exc}") + + result = output.getvalue().strip() + logger.info(f"Exec complete, output: {result}") + return result + + @cmds.command(name="async") + async def cmd_async(self, ctx: Context, *, args: str): + """ + Execute some code inside the bot context + """ + ownerid = conf.bot.get("owner_id").strip() + if not (ownerid and ctx.author.id == ownerid): + await ctx.reply("Sorry, you don't have sufficient permissions for this.") + return + + result = await self._async(args, ctx=ctx) + if len(result) > 500: + # TODO: Upload to a pastebin + await ctx.reply( + "Output too long to display! Please check the logs. (Pastebin coming soon.)" + ) + elif result: + await ctx.reply(result) + else: + await ctx.reply("Exec complete with no output.")