Files
croccybot/bot/meta/interactions/manager.py

107 lines
3.4 KiB
Python

import asyncio
import logging
from datetime import datetime, timedelta
class InteractionManager:
def __init__(self, timeout=600, extend=None):
self.futures = []
self.self_futures = []
self.cleanup_function = self._cleanup
self.timeout_function = self._timeout
self.close_function = self._close
self.timeout = timeout
self.extend = extend or timeout
self.expires_at = None
self.cleaned_up = asyncio.Event()
async def _timeout_loop(self):
diff = (self.expires_at - datetime.now()).total_seconds()
while True:
try:
await asyncio.sleep(diff)
except asyncio.CancelledError:
break
diff = (self.expires_at - datetime.now()).total_seconds()
if diff <= 0:
asyncio.create_task(self.run_timeout())
break
def extend_timeout(self):
new_expiry = max(datetime.now() + timedelta(seconds=self.extend), self.expires_at)
self.expires_at = new_expiry
async def wait(self):
"""
Wait until the manager is "done".
That is, until all the futures are done, or `closed` is set.
"""
closed_task = asyncio.create_task(self.cleaned_up.wait())
futures_task = asyncio.create_task(asyncio.wait(self.futures))
await asyncio.wait((closed_task, futures_task), return_when=asyncio.FIRST_COMPLETED)
async def __aenter__(self):
if self.timeout is not None:
self.expires_at = datetime.now() + timedelta(seconds=self.timeout)
self.self_futures.append(
asyncio.create_task(self._timeout_loop())
)
return self
async def __aexit__(self, *args):
if not self.cleaned_up.is_set():
await self.cleanup(exiting=True)
async def _cleanup(self, manager, timeout=False, closing=False, exiting=False, **kwargs):
for future in self.futures:
future.cancel()
for future in self.self_futures:
future.cancel()
self.cleaned_up.set()
def on_cleanup(self, func):
self.cleanup_function = func
return func
async def cleanup(self, **kwargs):
try:
await self.cleanup_function(self, **kwargs)
except Exception:
logging.debug("An error occurred while cleaning up the InteractionManager", exc_info=True)
async def _timeout(self, manager, **kwargs):
await self.cleanup(timeout=True, **kwargs)
def on_timeout(self, func):
self.timeout_function = func
return func
async def run_timeout(self):
try:
await self.timeout_function(self)
except Exception:
logging.debug("An error occurred while timing out the InteractionManager", exc_info=True)
async def close(self, **kwargs):
"""
Request closure of the manager.
"""
try:
await self.close_function(self, **kwargs)
except Exception:
logging.debug("An error occurred while closing the InteractionManager", exc_info=True)
def on_close(self, func):
self.close_function = func
return func
async def _close(self, manager, **kwargs):
await self.cleanup(closing=True, **kwargs)
def add_future(self, future):
self.futures.append(future)
return future