diff --git a/bot/analytics/cog.py b/bot/analytics/cog.py index 57d060e1..78c24de8 100644 --- a/bot/analytics/cog.py +++ b/bot/analytics/cog.py @@ -1,6 +1,14 @@ -from meta import LionCog, LionBot, LionContext +import logging +import discord +from discord.ext.commands import Bot, Cog, HybridCommand, HybridCommandError +from discord.ext.commands.errors import CommandInvokeError, CheckFailure +from discord.app_commands.errors import CommandInvokeError as appCommandInvokeError + +from meta import LionCog, LionBot, LionContext from meta.app import shard_talk, appname +from meta.errors import HandledException, SafeCancellation +from meta.logger import log_wrap from utils.lib import utc_now from .data import AnalyticsData @@ -9,6 +17,9 @@ from .events import ( GuildAction, GuildEvent, guild_event_handler, VoiceAction, VoiceEvent, voice_event_handler ) +from .snapshot import shard_snapshot + +logger = logging.getLogger(__name__) # TODO: Client side might be better handled as a single connection fed by a queue? @@ -26,10 +37,35 @@ class Analytics(LionCog): self.talk_guild_event = guild_event_handler.bind(shard_talk).route self.talk_voice_event = voice_event_handler.bind(shard_talk).route + self.talk_shard_snapshot = shard_talk.register_route()(shard_snapshot) + async def cog_load(self): await self.data.init() @LionCog.listener() + @log_wrap(action='AnEvent') + async def on_voice_state_update(self, member, before, after): + if not before.channel and after.channel: + # Member joined channel + action = VoiceAction.JOINED + elif before.channel and not after.channel: + # Member left channel + action = VoiceAction.LEFT + + event = VoiceEvent( + appname=appname, + userid=member.id, + guildid=member.guild.id, + action=action, + created_at=utc_now() + ) + if self.an_app not in shard_talk.peers: + logger.warning(f"Analytics peer not found, discarding event: {event}") + else: + await self.talk_voice_event(event).send(self.an_app, wait_for_reply=False) + + @LionCog.listener() + @log_wrap(action='AnEvent') async def on_guild_join(self, guild): """ Send guild join event. @@ -40,10 +76,14 @@ class Analytics(LionCog): action=GuildAction.JOINED, created_at=utc_now() ) - await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) + if self.an_app not in shard_talk.peers: + logger.warning(f"Analytics peer not found, discarding event: {event}") + else: + await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) @LionCog.listener() - async def on_guild_leave(self, guild): + @log_wrap(action='AnEvent') + async def on_guild_remove(self, guild): """ Send guild leave event """ @@ -53,13 +93,18 @@ class Analytics(LionCog): action=GuildAction.LEFT, created_at=utc_now() ) - await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) + if self.an_app not in shard_talk.peers: + logger.warning(f"Analytics peer not found, discarding event: {event}") + else: + await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) @LionCog.listener() + @log_wrap(action='AnEvent') async def on_command_completion(self, ctx: LionContext): """ Send command completed successfully. """ + duration = utc_now() - ctx.message.created_at event = CommandEvent( appname=appname, cmdname=ctx.command.name if ctx.command else 'Unknown', @@ -67,16 +112,63 @@ class Analytics(LionCog): userid=ctx.author.id, created_at=utc_now(), status=CommandStatus.COMPLETED, - execution_time=0, + execution_time=duration.total_seconds(), guildid=ctx.guild.id if ctx.guild else None, ctxid=ctx.message.id ) - await self.talk_command_event(event).send(self.an_app, wait_for_reply=False) + if self.an_app not in shard_talk.peers: + logger.warning(f"Analytics peer not found, discarding event: {event}") + else: + await self.talk_command_event(event).send(self.an_app, wait_for_reply=False) @LionCog.listener() + @log_wrap(action='AnEvent') async def on_command_error(self, ctx: LionContext, error): """ Send command failed. """ - # TODO: Add command error field? - ... + duration = utc_now() - ctx.message.created_at + status = CommandStatus.FAILED + err_type = None + try: + err_type = repr(error) + raise error + except (HybridCommandError, CommandInvokeError, appCommandInvokeError): + original = error.original + try: + err_type = repr(original) + if isinstance(original, (HybridCommandError, CommandInvokeError, appCommandInvokeError)): + raise original.original + else: + raise original + except HandledException: + status = CommandStatus.CANCELLED + except SafeCancellation: + status = CommandStatus.CANCELLED + except discord.Forbidden: + status = CommandStatus.CANCELLED + except discord.HTTPException: + status = CommandStatus.CANCELLED + except Exception: + status = CommandStatus.FAILED + except CheckFailure: + status = CommandStatus.CANCELLED + except Exception: + status = CommandStatus.FAILED + + event = CommandEvent( + appname=appname, + cmdname=ctx.command.name if ctx.command else 'Unknown', + cogname=ctx.cog.qualified_name if ctx.cog else None, + userid=ctx.author.id, + created_at=utc_now(), + status=status, + error=err_type, + execution_time=duration.total_seconds(), + guildid=ctx.guild.id if ctx.guild else None, + ctxid=ctx.message.id + ) + if self.an_app not in shard_talk.peers: + logger.warning(f"Analytics peer not found, discarding event: {event}") + else: + await self.talk_command_event(event).send(self.an_app, wait_for_reply=False) diff --git a/bot/analytics/data.py b/bot/analytics/data.py index 2d906d1b..e58c8968 100644 --- a/bot/analytics/data.py +++ b/bot/analytics/data.py @@ -60,9 +60,10 @@ class AnalyticsData(Registry, name='analytics'): snapshotid SERIAL PRIMARY KEY, appname TEXT NOT NULL REFERENCES bot_config (appname), guild_count INTEGER NOT NULL, - study_time BIGINT NOT NULL, + member_count INTEGER NOT NULL, + user_count INTEGER NOT NULL, in_voice INTEGER NOT NULL, - _created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') + created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') ); """ _schema_ = 'analytics' @@ -71,8 +72,9 @@ class AnalyticsData(Registry, name='analytics'): snapshotid = Integer(primary=True) appname = String() guild_count = Integer() + member_count = Integer() + user_count = Integer() in_voice = Integer() - study_time = Integer() created_at = Timestamp() class Events(RowModel): @@ -105,7 +107,7 @@ class AnalyticsData(Registry, name='analytics'): cogname TEXT, userid BIGINT NOT NULL, status analytics.CommandStatus NOT NULL, - execution_time INTEGER NOT NULL + execution_time REAL NOT NULL ) INHERITS (analytics.events); """ _schema_ = 'analytics' @@ -121,7 +123,8 @@ class AnalyticsData(Registry, name='analytics'): cogname = String() userid = Integer() status: Column[CommandStatus] = Column() - execution_time = Integer() + error = String() + execution_time: Column[float] = Column() class Guilds(RowModel): """ @@ -150,7 +153,7 @@ class AnalyticsData(Registry, name='analytics'): CREATE TABLE analytics.voice_sessions( userid BIGINT NOT NULL, action analytics.VoiceAction NOT NULL - ); + ) INHERITS (analytics.events); """ _schema_ = 'analytics' _tablename_ = 'voice_sessions' @@ -171,7 +174,7 @@ class AnalyticsData(Registry, name='analytics'): CREATE TABLE analytics.gui_renders( cardname TEXT NOT NULL, duration INTEGER NOT NULL - ); + ) INHERITS (analytics.events); """ _schema_ = 'analytics' _tablename_ = 'gui_renders' diff --git a/bot/analytics/events.py b/bot/analytics/events.py index dbd8e4fb..c1d0b4ca 100644 --- a/bot/analytics/events.py +++ b/bot/analytics/events.py @@ -1,29 +1,35 @@ import asyncio import datetime +import logging from collections import namedtuple from typing import NamedTuple, Optional, Generic, Type, TypeVar from meta.ipc import AppRoute, AppClient +from meta.logger import logging_context, log_wrap + from data import RowModel from .data import AnalyticsData, CommandStatus, VoiceAction, GuildAction +logger = logging.getLogger(__name__) + """ TODO Snapshot type? Incremental or manual? Request snapshot route will require all shards to be online +Update batch size before release, or put it in the config """ T = TypeVar('T') class EventHandler(Generic[T]): - batch_size = 1 - - def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T]): + def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T], batchsize: int = 20): self.model = model self.struct = struct + self.batch_size = batchsize + self.route_name = route_name self._route: Optional[AppRoute] = None self._client: Optional[AppClient] = None @@ -39,27 +45,46 @@ class EventHandler(Generic[T]): return self._route async def handle_event(self, data): - await self.queue.put(data) + try: + await self.queue.put(data) + except asyncio.QueueFull: + logger.warning( + f"Queue on event handler {self.route_name} is full! Discarding event {data}" + ) async def consumer(self): - while True: - try: - item = await self.queue.get() - self.batch.append(item) - if len(self.batch) > self.batch_size: - await self.process_batch() - except asyncio.CancelledError: - raise - except asyncio.TimeoutError: - raise + with logging_context(action='consumer'): + while True: + try: + item = await self.queue.get() + self.batch.append(item) + if len(self.batch) > self.batch_size: + await self.process_batch() + except asyncio.CancelledError: + # Try and process the last batch + logger.info( + f"Event handler {self.route_name} received cancellation signal! " + "Trying to process last batch." + ) + if self.batch: + await self.process_batch() + raise + except Exception: + logger.exception( + f"Event handler {self.route_name} received unhandled error." + " Ignoring and continuing cautiously." + ) + pass async def process_batch(self): - # TODO: copy syntax might be more efficient here - await self.model.table.insert_many( - self.struct._fields, - *map(tuple, self.batch) - ) - self.batch.clear() + with logging_context(action='batch'): + logger.debug("Processing Batch") + # TODO: copy syntax might be more efficient here + await self.model.table.insert_many( + self.struct._fields, + *map(tuple, self.batch) + ) + self.batch.clear() def bind(self, client: AppClient): """ @@ -81,14 +106,21 @@ class EventHandler(Generic[T]): raise ValueError("Not attached, cannot detach!") self._client.routes.pop(self.route_name, None) self._route = None + logger.info( + f"EventHandler {self.route_name} has attached to the ShardTalk client." + ) return self async def attach(self, client: AppClient): """ Attach to a ShardTalk client and start listening. """ - self.bind(client) - self._consumer_task = asyncio.create_task(self.consumer()) + with logging_context(action=self.route_name): + self.bind(client) + self._consumer_task = asyncio.create_task(self.consumer()) + logger.info( + f"EventHandler {self.route_name} is listening for incoming events." + ) return self async def detach(self): @@ -99,6 +131,9 @@ class EventHandler(Generic[T]): if self._consumer_task and not self._consumer_task.done(): self._consumer_task.cancel() self._consumer_task = None + logger.info( + f"EventHandler {self.route_name} has detached." + ) return self @@ -108,14 +143,15 @@ class CommandEvent(NamedTuple): userid: int created_at: datetime.datetime status: CommandStatus - execution_time: int + execution_time: float + error: Optional[str] = None cogname: Optional[str] = None guildid: Optional[int] = None ctxid: Optional[int] = None command_event_handler: EventHandler[CommandEvent] = EventHandler( - 'command_event', AnalyticsData.Commands, CommandEvent + 'command_event', AnalyticsData.Commands, CommandEvent, batchsize=1 ) @@ -127,17 +163,18 @@ class GuildEvent(NamedTuple): guild_event_handler: EventHandler[GuildEvent] = EventHandler( - 'guild_event', AnalyticsData.Guilds, GuildEvent + 'guild_event', AnalyticsData.Guilds, GuildEvent, batchsize=0 ) class VoiceEvent(NamedTuple): appname: str guildid: int + userid: int action: VoiceAction created_at: datetime.datetime voice_event_handler: EventHandler[VoiceEvent] = EventHandler( - 'voice_event', AnalyticsData.VoiceSession, VoiceEvent + 'voice_event', AnalyticsData.VoiceSession, VoiceEvent, batchsize=5 ) diff --git a/bot/analytics/server.py b/bot/analytics/server.py index 5356cd02..b81bcfe4 100644 --- a/bot/analytics/server.py +++ b/bot/analytics/server.py @@ -1,13 +1,17 @@ import asyncio import logging +from typing import Optional from meta import conf, appname -from meta.logger import log_context, log_action_stack, logging_context, log_app +from meta.logger import log_context, log_action_stack, logging_context, log_app, log_wrap from meta.ipc import AppClient +from meta.app import appname_from_shard +from meta.sharding import shard_count from data import Database from .events import command_event_handler, guild_event_handler, voice_event_handler +from .snapshot import shard_snapshot, ShardSnapshot from .data import AnalyticsData @@ -17,28 +21,108 @@ for name in conf.config.options('LOGGING_LEVELS', no_defaults=True): logging.getLogger(name).setLevel(conf.logging_levels[name]) -db = Database(conf.data['args']) +class AnalyticsServer: + # TODO: Move these to the config + # How often to request snapshots + snap_period = 120 + # How soon after a snapshot failure (e.g. not all shards online) to retry + snap_retry_period = 10 + def __init__(self) -> None: + self.db = Database(conf.data['args']) + self.data = self.db.load_registry(AnalyticsData()) -async def main(): - log_action_stack.set(['Analytics']) - log_app.set(conf.analytics['appname']) + self.event_handlers = [ + command_event_handler, + guild_event_handler, + voice_event_handler + ] - async with await db.connect(): - db.load_registry(AnalyticsData()) - analytic_talk = AppClient( + self.talk = AppClient( conf.analytics['appname'], appname, {'host': conf.analytics['server_host'], 'port': int(conf.analytics['server_port'])}, {'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])} ) - await analytic_talk.connect() - cmd = await command_event_handler.attach(analytic_talk) - guild = await guild_event_handler.attach(analytic_talk) - voice = await voice_event_handler.attach(analytic_talk) - logger.info("Finished initialising, waiting for events.") - await asyncio.gather(cmd._consumer_task, guild._consumer_task, voice._consumer_task) + self.talk_shard_snapshot = self.talk.register_route()(shard_snapshot) + + self._snap_task: Optional[asyncio.Task] = None + + async def attach_event_handlers(self): + for handler in self.event_handlers: + await handler.attach(self.talk) + + @log_wrap(action='Snap') + async def take_snapshot(self): + # Check if all the shards are registered on shard_talk + expected_peers = [appname_from_shard(i) for i in range(0, shard_count)] + if missing := [peer for peer in expected_peers if peer not in self.talk.peers]: + # We are missing peer(s)! + logger.warning( + f"Analytics could not take snapshot because peers are missing: {', '.join(missing)}" + ) + return False + + # Everyone is here, ask for shard snapshots + results = await self.talk_shard_snapshot().broadcast() + + # Make sure everyone sent results and there were no exceptions (e.g. concurrency) + if not all(result is not None and not isinstance(result, Exception) for result in results.values()): + # This should essentially never happen + # Either some of the shards could not make a snapshot (e.g. Discord client issues) + # or they disconnected in the process. + logger.warning( + f"Analytics could not take snapshot because some peers failed! Partial snapshot: {results}" + ) + return False + + # Now we have a dictionary of shard snapshots, aggregate, pull in remaining data, and store. + # TODO Possibly move this out into snapshots.py? + aggregate = {field: 0 for field in ShardSnapshot._fields} + for result in results.values(): + for field, num in result._asdict().items(): + aggregate[field] += num + + row = await self.data.Snapshots.create( + appname=appname, + guild_count=aggregate['guild_count'], + member_count=aggregate['member_count'], + user_count=aggregate['user_count'], + in_voice=aggregate['voice_count'], + ) + logger.info(f"Created snapshot: {row.data!r}") + return True + + @log_wrap(action='SnapLoop') + async def snapshot_loop(self): + while True: + try: + result = await self.take_snapshot() + if result: + await asyncio.sleep(self.snap_period) + else: + logger.info("Snapshot failed, retrying after %d seconds", self.snap_retry_period) + await asyncio.sleep(self.snap_retry_period) + except asyncio.CancelledError: + logger.info("Snapshot loop cancelled, closing.") + return + except Exception: + logger.exception( + "Unhandled exception during snapshot loop. Ignoring and continuing cautiously." + ) + await asyncio.sleep(self.snap_retry_period) + + async def run(self): + log_action_stack.set(['Analytics']) + log_app.set(conf.analytics['appname']) + + async with await self.db.connect(): + await self.talk.connect() + await self.attach_event_handlers() + self._snap_task = asyncio.create_task(self.snapshot_loop()) + await asyncio.gather(*(handler._consumer_task for handler in self.event_handlers)) if __name__ == '__main__': - asyncio.run(main()) + server = AnalyticsServer() + asyncio.run(server.run()) diff --git a/bot/analytics/snapshot.py b/bot/analytics/snapshot.py new file mode 100644 index 00000000..565280e1 --- /dev/null +++ b/bot/analytics/snapshot.py @@ -0,0 +1,28 @@ +from typing import NamedTuple + +from meta.context import ctx_bot + + +class ShardSnapshot(NamedTuple): + guild_count: int + voice_count: int + member_count: int + user_count: int + + +async def shard_snapshot(): + """ + Take a snapshot of the current shard. + """ + bot = ctx_bot.get() + if bot is None or not bot.is_ready(): + # We cannot take a snapshot without Bot + # Just quietly fail + return None + snap = ShardSnapshot( + guild_count=len(bot.guilds), + voice_count=sum(len(channel.members) for guild in bot.guilds for channel in guild.voice_channels), + member_count=sum(len(guild.members) for guild in bot.guilds), + user_count=len(set(m.id for guild in bot.guilds for m in guild.members)) + ) + return snap diff --git a/bot/core/cog.py b/bot/core/cog.py index f561c011..d584d537 100644 --- a/bot/core/cog.py +++ b/bot/core/cog.py @@ -45,7 +45,7 @@ class CoreCog(LionCog): shard_count=self.bot.shard_count ) self.bot.add_listener(self.shard_update_guilds, name='on_guild_join') - self.bot.add_listener(self.shard_update_guilds, name='on_guild_leave') + self.bot.add_listener(self.shard_update_guilds, name='on_guild_remove') self.bot.core = self diff --git a/bot/meta/LionBot.py b/bot/meta/LionBot.py index 585fd8dd..20dfd5da 100644 --- a/bot/meta/LionBot.py +++ b/bot/meta/LionBot.py @@ -98,7 +98,8 @@ class LionBot(Bot): async def on_command(self, ctx: LionContext): logger.info( - f"Executing command '{ctx.command.qualified_name}' (from module '{ctx.cog.__cog_name__}') " + f"Executing command '{ctx.command.qualified_name}' " + f"(from module '{ctx.cog.qualified_name if ctx.cog else 'None'}') " f"with arguments {ctx.args} and kwargs {ctx.kwargs}.", extra={'with_ctx': True} ) @@ -109,11 +110,15 @@ class LionBot(Bot): if isinstance(ctx.command, HybridCommand) and ctx.command.app_command: cmd_str = ctx.command.app_command.to_dict() try: - raise exception + raise exception from None except (HybridCommandError, CommandInvokeError, appCommandInvokeError): - original = exception.original try: - raise original + if isinstance(exception.original, (HybridCommandError, CommandInvokeError, appCommandInvokeError)): + original = exception.original.original + raise original + else: + original = exception.original + raise original except HandledException: pass except SafeCancellation: @@ -151,7 +156,6 @@ class LionBot(Bot): except Exception: logger.exception( f"Caught an unknown CommandInvokeError while executing: {cmd_str}", - exc_info=exception, extra={'action': 'BotError', 'with_ctx': True} ) @@ -183,6 +187,5 @@ class LionBot(Bot): # Something is very wrong here, don't attempt user interaction. logger.exception( f"Caught an unknown top-level exception while executing: {cmd_str}", - exc_info=exception, extra={'action': 'BotError', 'with_ctx': True} ) diff --git a/data/migration/v12-13/migration.sql b/data/migration/v12-13/migration.sql index f231aaf8..b828d5fc 100644 --- a/data/migration/v12-13/migration.sql +++ b/data/migration/v12-13/migration.sql @@ -58,7 +58,8 @@ CREATE TABLE analytics.snapshots( snapshotid SERIAL PRIMARY KEY, appname TEXT NOT NULL REFERENCES bot_config (appname), guild_count INTEGER NOT NULL, - study_time BIGINT NOT NULL, + member_count INTEGER NOT NULL, + user_count INTEGER NOT NULL, in_voice INTEGER NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') ); @@ -83,7 +84,8 @@ CREATE TABLE analytics.commands( cogname TEXT, userid BIGINT NOT NULL, status analytics.CommandStatus NOT NULL, - execution_time INTEGER NOT NULL + error TEXT, + execution_time REAL NOT NULL ) INHERITS (analytics.events); @@ -106,9 +108,9 @@ CREATE TYPE analytics.VoiceAction AS ENUM( CREATE TABLE analytics.voice_sessions( userid BIGINT NOT NULL, action analytics.VoiceAction NOT NULL -); +) INHERITS (analytics.events); CREATE TABLE analytics.gui_renders( cardname TEXT NOT NULL, duration INTEGER NOT NULL -); +) INHERITS (analytics.events); diff --git a/run_analytics.py b/run_analytics.py index e2f5a4f5..461052a3 100755 --- a/run_analytics.py +++ b/run_analytics.py @@ -4,8 +4,9 @@ import asyncio sys.path.insert(0, os.path.join(os.getcwd(), "bot")) -from bot.analytics.server import main +from bot.analytics.server import AnalyticsServer if __name__ == '__main__': - asyncio.run(main()) + server = AnalyticsServer() + asyncio.run(server.run())