(logging): Add multiprocess support for logging.

This commit is contained in:
2023-07-18 10:35:10 +03:00
parent e54c6bc62d
commit f4769b407e
5 changed files with 51 additions and 32 deletions

View File

@@ -3,7 +3,8 @@ import logging
import asyncio
from typing import List
from logging.handlers import QueueListener, QueueHandler
from queue import SimpleQueue
import queue
import multiprocessing
from contextlib import contextmanager
from io import StringIO
from functools import wraps
@@ -199,6 +200,7 @@ class WebHookHandler(logging.StreamHandler):
return self.loop
def emit(self, record):
self.format(record)
self.get_loop().call_soon_threadsafe(self._post, record)
def _post(self, record):
@@ -292,35 +294,42 @@ if webhook := conf.logging['critical_log']:
handler.setLevel(logging.CRITICAL)
handlers.append(handler)
if handlers:
# First create a separate loop to run the handlers on
import threading
def run_loop(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
loop = asyncio.new_event_loop()
loop_thread = threading.Thread(target=lambda: run_loop(loop))
loop_thread.daemon = True
loop_thread.start()
for handler in handlers:
handler.loop = loop
queue: SimpleQueue[logging.LogRecord] = SimpleQueue()
def make_queue_handler(queue):
qhandler = QueueHandler(queue)
qhandler.setLevel(logging.INFO)
qhandler.addFilter(ContextInjection())
# qhandler.addFilter(ThreadFilter('MainThread'))
logger.addHandler(qhandler)
return qhandler
listener = QueueListener(
queue, *handlers, respect_handler_level=True
)
listener.start()
def setup_main_logger(multiprocess=False):
q = multiprocessing.Queue() if multiprocess else queue.SimpleQueue()
if handlers:
# First create a separate loop to run the handlers on
import threading
def run_loop(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
loop = asyncio.new_event_loop()
loop_thread = threading.Thread(target=lambda: run_loop(loop))
loop_thread.daemon = True
loop_thread.start()
for handler in handlers:
handler.loop = loop
qhandler = make_queue_handler(q)
# qhandler.addFilter(ThreadFilter('MainThread'))
logger.addHandler(qhandler)
listener = QueueListener(
q, *handlers, respect_handler_level=True
)
listener.start()
return q