fix(logging): Avoid livelog ratelimit.
This commit is contained in:
@@ -17,6 +17,7 @@ from .config import conf
|
|||||||
from . import sharding
|
from . import sharding
|
||||||
from .context import context
|
from .context import context
|
||||||
from utils.lib import utc_now
|
from utils.lib import utc_now
|
||||||
|
from utils.ratelimits import Bucket, BucketOverFull, BucketFull
|
||||||
|
|
||||||
|
|
||||||
log_logger = logging.getLogger(__name__)
|
log_logger = logging.getLogger(__name__)
|
||||||
@@ -258,7 +259,7 @@ class LocalQueueHandler(QueueHandler):
|
|||||||
|
|
||||||
|
|
||||||
class WebHookHandler(logging.StreamHandler):
|
class WebHookHandler(logging.StreamHandler):
|
||||||
def __init__(self, webhook_url, prefix="", batch=False, loop=None):
|
def __init__(self, webhook_url, prefix="", batch=True, loop=None):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.webhook_url = webhook_url
|
self.webhook_url = webhook_url
|
||||||
self.prefix = prefix
|
self.prefix = prefix
|
||||||
@@ -270,6 +271,12 @@ class WebHookHandler(logging.StreamHandler):
|
|||||||
self.last_batched = None
|
self.last_batched = None
|
||||||
self.waiting = []
|
self.waiting = []
|
||||||
|
|
||||||
|
self.bucket = Bucket(20, 40)
|
||||||
|
self.ignored = 0
|
||||||
|
|
||||||
|
self.session = None
|
||||||
|
self.webhook = None
|
||||||
|
|
||||||
def get_loop(self):
|
def get_loop(self):
|
||||||
if self.loop is None:
|
if self.loop is None:
|
||||||
self.loop = asyncio.new_event_loop()
|
self.loop = asyncio.new_event_loop()
|
||||||
@@ -281,8 +288,14 @@ class WebHookHandler(logging.StreamHandler):
|
|||||||
self.get_loop().call_soon_threadsafe(self._post, record)
|
self.get_loop().call_soon_threadsafe(self._post, record)
|
||||||
|
|
||||||
def _post(self, record):
|
def _post(self, record):
|
||||||
|
if self.session is None:
|
||||||
|
self.setup()
|
||||||
asyncio.create_task(self.post(record))
|
asyncio.create_task(self.post(record))
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
self.session = aiohttp.ClientSession()
|
||||||
|
self.webhook = Webhook.from_url(self.webhook_url, session=self.session)
|
||||||
|
|
||||||
async def post(self, record):
|
async def post(self, record):
|
||||||
log_context.set("Webhook Logger")
|
log_context.set("Webhook Logger")
|
||||||
log_action_stack.set(("Logging",))
|
log_action_stack.set(("Logging",))
|
||||||
@@ -314,7 +327,7 @@ class WebHookHandler(logging.StreamHandler):
|
|||||||
else:
|
else:
|
||||||
await self._send(message, as_file=as_file)
|
await self._send(message, as_file=as_file)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print(ex)
|
print(f"Unexpected error occurred while logging to webhook: {repr(ex)}", file=sys.stderr)
|
||||||
|
|
||||||
async def _schedule_batched(self):
|
async def _schedule_batched(self):
|
||||||
if self.batch_task is not None and not (self.batch_task.done() or self.batch_task.cancelled()):
|
if self.batch_task is not None and not (self.batch_task.done() or self.batch_task.cancelled()):
|
||||||
@@ -327,7 +340,7 @@ class WebHookHandler(logging.StreamHandler):
|
|||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print(ex)
|
print(f"Unexpected error occurred while scheduling batched webhook log: {repr(ex)}", file=sys.stderr)
|
||||||
|
|
||||||
async def _send_batched_now(self):
|
async def _send_batched_now(self):
|
||||||
if self.batch_task is not None and not self.batch_task.done():
|
if self.batch_task is not None and not self.batch_task.done():
|
||||||
@@ -342,18 +355,36 @@ class WebHookHandler(logging.StreamHandler):
|
|||||||
await self._send(batched)
|
await self._send(batched)
|
||||||
|
|
||||||
async def _send(self, message, as_file=False):
|
async def _send(self, message, as_file=False):
|
||||||
async with aiohttp.ClientSession() as session:
|
try:
|
||||||
webhook = Webhook.from_url(self.webhook_url, session=session)
|
self.bucket.request()
|
||||||
|
except BucketOverFull:
|
||||||
|
# Silently ignore
|
||||||
|
self.ignored += 1
|
||||||
|
return
|
||||||
|
except BucketFull:
|
||||||
|
logger.warning(
|
||||||
|
f"Live logging webhook {self.webhook.id} going too fast! "
|
||||||
|
"Ignoring records until rate slows down."
|
||||||
|
)
|
||||||
|
self.ignored += 1
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
if self.ignored > 0:
|
||||||
|
logger.warning(
|
||||||
|
f"{self.ignored} live logging records on webhook {self.webhook.id} skipped, continuing."
|
||||||
|
)
|
||||||
|
self.ignored = 0
|
||||||
|
|
||||||
if as_file or len(message) > 1900:
|
if as_file or len(message) > 1900:
|
||||||
with StringIO(message) as fp:
|
with StringIO(message) as fp:
|
||||||
fp.seek(0)
|
fp.seek(0)
|
||||||
await webhook.send(
|
await self.webhook.send(
|
||||||
f"{self.prefix}\n`{message.splitlines()[0]}`",
|
f"{self.prefix}\n`{message.splitlines()[0]}`",
|
||||||
file=File(fp, filename="logs.md"),
|
file=File(fp, filename="logs.md"),
|
||||||
username=log_app.get()
|
username=log_app.get()
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
await webhook.send(self.prefix + '\n' + message, username=log_app.get())
|
await self.webhook.send(self.prefix + '\n' + message, username=log_app.get())
|
||||||
|
|
||||||
|
|
||||||
handlers = []
|
handlers = []
|
||||||
@@ -362,8 +393,8 @@ if webhook := conf.logging['general_log']:
|
|||||||
handlers.append(handler)
|
handlers.append(handler)
|
||||||
|
|
||||||
if webhook := conf.logging['error_log']:
|
if webhook := conf.logging['error_log']:
|
||||||
handler = WebHookHandler(webhook, prefix=conf.logging['error_prefix'], batch=False)
|
handler = WebHookHandler(webhook, prefix=conf.logging['error_prefix'], batch=True)
|
||||||
handler.setLevel(logging.ERROR)
|
handler.setLevel(logging.WARNING)
|
||||||
handlers.append(handler)
|
handlers.append(handler)
|
||||||
|
|
||||||
if webhook := conf.logging['critical_log']:
|
if webhook := conf.logging['critical_log']:
|
||||||
|
|||||||
@@ -67,9 +67,9 @@ class Bucket:
|
|||||||
|
|
||||||
def request(self):
|
def request(self):
|
||||||
self._leak()
|
self._leak()
|
||||||
if self._level + 1 > self.max_level + 1:
|
if self._level > self.max_level:
|
||||||
raise BucketOverFull
|
raise BucketOverFull
|
||||||
elif self._level + 1 > self.max_level:
|
elif self._level == self.max_level:
|
||||||
self._level += 1
|
self._level += 1
|
||||||
if self._last_full:
|
if self._last_full:
|
||||||
raise BucketOverFull
|
raise BucketOverFull
|
||||||
|
|||||||
Reference in New Issue
Block a user