diff --git a/src/meta/logger.py b/src/meta/logger.py index 9ace46b1..ddd7121a 100644 --- a/src/meta/logger.py +++ b/src/meta/logger.py @@ -17,6 +17,7 @@ from .config import conf from . import sharding from .context import context from utils.lib import utc_now +from utils.ratelimits import Bucket, BucketOverFull, BucketFull log_logger = logging.getLogger(__name__) @@ -258,7 +259,7 @@ class LocalQueueHandler(QueueHandler): class WebHookHandler(logging.StreamHandler): - def __init__(self, webhook_url, prefix="", batch=False, loop=None): + def __init__(self, webhook_url, prefix="", batch=True, loop=None): super().__init__() self.webhook_url = webhook_url self.prefix = prefix @@ -270,6 +271,12 @@ class WebHookHandler(logging.StreamHandler): self.last_batched = None self.waiting = [] + self.bucket = Bucket(20, 40) + self.ignored = 0 + + self.session = None + self.webhook = None + def get_loop(self): if self.loop is None: self.loop = asyncio.new_event_loop() @@ -281,8 +288,14 @@ class WebHookHandler(logging.StreamHandler): self.get_loop().call_soon_threadsafe(self._post, record) def _post(self, record): + if self.session is None: + self.setup() asyncio.create_task(self.post(record)) + def setup(self): + self.session = aiohttp.ClientSession() + self.webhook = Webhook.from_url(self.webhook_url, session=self.session) + async def post(self, record): log_context.set("Webhook Logger") log_action_stack.set(("Logging",)) @@ -314,7 +327,7 @@ class WebHookHandler(logging.StreamHandler): else: await self._send(message, as_file=as_file) except Exception as ex: - print(ex) + print(f"Unexpected error occurred while logging to webhook: {repr(ex)}", file=sys.stderr) async def _schedule_batched(self): if self.batch_task is not None and not (self.batch_task.done() or self.batch_task.cancelled()): @@ -327,7 +340,7 @@ class WebHookHandler(logging.StreamHandler): except asyncio.CancelledError: return except Exception as ex: - print(ex) + print(f"Unexpected error occurred while scheduling batched webhook log: {repr(ex)}", file=sys.stderr) async def _send_batched_now(self): if self.batch_task is not None and not self.batch_task.done(): @@ -342,18 +355,36 @@ class WebHookHandler(logging.StreamHandler): await self._send(batched) async def _send(self, message, as_file=False): - async with aiohttp.ClientSession() as session: - webhook = Webhook.from_url(self.webhook_url, session=session) - if as_file or len(message) > 1900: - with StringIO(message) as fp: - fp.seek(0) - await webhook.send( - f"{self.prefix}\n`{message.splitlines()[0]}`", - file=File(fp, filename="logs.md"), - username=log_app.get() - ) - else: - await webhook.send(self.prefix + '\n' + message, username=log_app.get()) + try: + self.bucket.request() + except BucketOverFull: + # Silently ignore + self.ignored += 1 + return + except BucketFull: + logger.warning( + f"Live logging webhook {self.webhook.id} going too fast! " + "Ignoring records until rate slows down." + ) + self.ignored += 1 + return + else: + if self.ignored > 0: + logger.warning( + f"{self.ignored} live logging records on webhook {self.webhook.id} skipped, continuing." + ) + self.ignored = 0 + + if as_file or len(message) > 1900: + with StringIO(message) as fp: + fp.seek(0) + await self.webhook.send( + f"{self.prefix}\n`{message.splitlines()[0]}`", + file=File(fp, filename="logs.md"), + username=log_app.get() + ) + else: + await self.webhook.send(self.prefix + '\n' + message, username=log_app.get()) handlers = [] @@ -362,8 +393,8 @@ if webhook := conf.logging['general_log']: handlers.append(handler) if webhook := conf.logging['error_log']: - handler = WebHookHandler(webhook, prefix=conf.logging['error_prefix'], batch=False) - handler.setLevel(logging.ERROR) + handler = WebHookHandler(webhook, prefix=conf.logging['error_prefix'], batch=True) + handler.setLevel(logging.WARNING) handlers.append(handler) if webhook := conf.logging['critical_log']: diff --git a/src/utils/ratelimits.py b/src/utils/ratelimits.py index 21c75160..56f25728 100644 --- a/src/utils/ratelimits.py +++ b/src/utils/ratelimits.py @@ -67,9 +67,9 @@ class Bucket: def request(self): self._leak() - if self._level + 1 > self.max_level + 1: + if self._level > self.max_level: raise BucketOverFull - elif self._level + 1 > self.max_level: + elif self._level == self.max_level: self._level += 1 if self._last_full: raise BucketOverFull