rewrite: Analytic snapshots.

This commit is contained in:
2022-11-19 21:02:37 +02:00
parent f912f9eabd
commit 6831781687
9 changed files with 319 additions and 69 deletions

View File

@@ -1,6 +1,14 @@
from meta import LionCog, LionBot, LionContext
import logging
import discord
from discord.ext.commands import Bot, Cog, HybridCommand, HybridCommandError
from discord.ext.commands.errors import CommandInvokeError, CheckFailure
from discord.app_commands.errors import CommandInvokeError as appCommandInvokeError
from meta import LionCog, LionBot, LionContext
from meta.app import shard_talk, appname
from meta.errors import HandledException, SafeCancellation
from meta.logger import log_wrap
from utils.lib import utc_now
from .data import AnalyticsData
@@ -9,6 +17,9 @@ from .events import (
GuildAction, GuildEvent, guild_event_handler,
VoiceAction, VoiceEvent, voice_event_handler
)
from .snapshot import shard_snapshot
logger = logging.getLogger(__name__)
# TODO: Client side might be better handled as a single connection fed by a queue?
@@ -26,10 +37,35 @@ class Analytics(LionCog):
self.talk_guild_event = guild_event_handler.bind(shard_talk).route
self.talk_voice_event = voice_event_handler.bind(shard_talk).route
self.talk_shard_snapshot = shard_talk.register_route()(shard_snapshot)
async def cog_load(self):
await self.data.init()
@LionCog.listener()
@log_wrap(action='AnEvent')
async def on_voice_state_update(self, member, before, after):
if not before.channel and after.channel:
# Member joined channel
action = VoiceAction.JOINED
elif before.channel and not after.channel:
# Member left channel
action = VoiceAction.LEFT
event = VoiceEvent(
appname=appname,
userid=member.id,
guildid=member.guild.id,
action=action,
created_at=utc_now()
)
if self.an_app not in shard_talk.peers:
logger.warning(f"Analytics peer not found, discarding event: {event}")
else:
await self.talk_voice_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
@log_wrap(action='AnEvent')
async def on_guild_join(self, guild):
"""
Send guild join event.
@@ -40,10 +76,14 @@ class Analytics(LionCog):
action=GuildAction.JOINED,
created_at=utc_now()
)
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
if self.an_app not in shard_talk.peers:
logger.warning(f"Analytics peer not found, discarding event: {event}")
else:
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
async def on_guild_leave(self, guild):
@log_wrap(action='AnEvent')
async def on_guild_remove(self, guild):
"""
Send guild leave event
"""
@@ -53,13 +93,18 @@ class Analytics(LionCog):
action=GuildAction.LEFT,
created_at=utc_now()
)
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
if self.an_app not in shard_talk.peers:
logger.warning(f"Analytics peer not found, discarding event: {event}")
else:
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
@log_wrap(action='AnEvent')
async def on_command_completion(self, ctx: LionContext):
"""
Send command completed successfully.
"""
duration = utc_now() - ctx.message.created_at
event = CommandEvent(
appname=appname,
cmdname=ctx.command.name if ctx.command else 'Unknown',
@@ -67,16 +112,63 @@ class Analytics(LionCog):
userid=ctx.author.id,
created_at=utc_now(),
status=CommandStatus.COMPLETED,
execution_time=0,
execution_time=duration.total_seconds(),
guildid=ctx.guild.id if ctx.guild else None,
ctxid=ctx.message.id
)
await self.talk_command_event(event).send(self.an_app, wait_for_reply=False)
if self.an_app not in shard_talk.peers:
logger.warning(f"Analytics peer not found, discarding event: {event}")
else:
await self.talk_command_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
@log_wrap(action='AnEvent')
async def on_command_error(self, ctx: LionContext, error):
"""
Send command failed.
"""
# TODO: Add command error field?
...
duration = utc_now() - ctx.message.created_at
status = CommandStatus.FAILED
err_type = None
try:
err_type = repr(error)
raise error
except (HybridCommandError, CommandInvokeError, appCommandInvokeError):
original = error.original
try:
err_type = repr(original)
if isinstance(original, (HybridCommandError, CommandInvokeError, appCommandInvokeError)):
raise original.original
else:
raise original
except HandledException:
status = CommandStatus.CANCELLED
except SafeCancellation:
status = CommandStatus.CANCELLED
except discord.Forbidden:
status = CommandStatus.CANCELLED
except discord.HTTPException:
status = CommandStatus.CANCELLED
except Exception:
status = CommandStatus.FAILED
except CheckFailure:
status = CommandStatus.CANCELLED
except Exception:
status = CommandStatus.FAILED
event = CommandEvent(
appname=appname,
cmdname=ctx.command.name if ctx.command else 'Unknown',
cogname=ctx.cog.qualified_name if ctx.cog else None,
userid=ctx.author.id,
created_at=utc_now(),
status=status,
error=err_type,
execution_time=duration.total_seconds(),
guildid=ctx.guild.id if ctx.guild else None,
ctxid=ctx.message.id
)
if self.an_app not in shard_talk.peers:
logger.warning(f"Analytics peer not found, discarding event: {event}")
else:
await self.talk_command_event(event).send(self.an_app, wait_for_reply=False)

View File

@@ -60,9 +60,10 @@ class AnalyticsData(Registry, name='analytics'):
snapshotid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
guild_count INTEGER NOT NULL,
study_time BIGINT NOT NULL,
member_count INTEGER NOT NULL,
user_count INTEGER NOT NULL,
in_voice INTEGER NOT NULL,
_created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
"""
_schema_ = 'analytics'
@@ -71,8 +72,9 @@ class AnalyticsData(Registry, name='analytics'):
snapshotid = Integer(primary=True)
appname = String()
guild_count = Integer()
member_count = Integer()
user_count = Integer()
in_voice = Integer()
study_time = Integer()
created_at = Timestamp()
class Events(RowModel):
@@ -105,7 +107,7 @@ class AnalyticsData(Registry, name='analytics'):
cogname TEXT,
userid BIGINT NOT NULL,
status analytics.CommandStatus NOT NULL,
execution_time INTEGER NOT NULL
execution_time REAL NOT NULL
) INHERITS (analytics.events);
"""
_schema_ = 'analytics'
@@ -121,7 +123,8 @@ class AnalyticsData(Registry, name='analytics'):
cogname = String()
userid = Integer()
status: Column[CommandStatus] = Column()
execution_time = Integer()
error = String()
execution_time: Column[float] = Column()
class Guilds(RowModel):
"""
@@ -150,7 +153,7 @@ class AnalyticsData(Registry, name='analytics'):
CREATE TABLE analytics.voice_sessions(
userid BIGINT NOT NULL,
action analytics.VoiceAction NOT NULL
);
) INHERITS (analytics.events);
"""
_schema_ = 'analytics'
_tablename_ = 'voice_sessions'
@@ -171,7 +174,7 @@ class AnalyticsData(Registry, name='analytics'):
CREATE TABLE analytics.gui_renders(
cardname TEXT NOT NULL,
duration INTEGER NOT NULL
);
) INHERITS (analytics.events);
"""
_schema_ = 'analytics'
_tablename_ = 'gui_renders'

View File

@@ -1,29 +1,35 @@
import asyncio
import datetime
import logging
from collections import namedtuple
from typing import NamedTuple, Optional, Generic, Type, TypeVar
from meta.ipc import AppRoute, AppClient
from meta.logger import logging_context, log_wrap
from data import RowModel
from .data import AnalyticsData, CommandStatus, VoiceAction, GuildAction
logger = logging.getLogger(__name__)
"""
TODO
Snapshot type? Incremental or manual?
Request snapshot route will require all shards to be online
Update batch size before release, or put it in the config
"""
T = TypeVar('T')
class EventHandler(Generic[T]):
batch_size = 1
def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T]):
def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T], batchsize: int = 20):
self.model = model
self.struct = struct
self.batch_size = batchsize
self.route_name = route_name
self._route: Optional[AppRoute] = None
self._client: Optional[AppClient] = None
@@ -39,27 +45,46 @@ class EventHandler(Generic[T]):
return self._route
async def handle_event(self, data):
await self.queue.put(data)
try:
await self.queue.put(data)
except asyncio.QueueFull:
logger.warning(
f"Queue on event handler {self.route_name} is full! Discarding event {data}"
)
async def consumer(self):
while True:
try:
item = await self.queue.get()
self.batch.append(item)
if len(self.batch) > self.batch_size:
await self.process_batch()
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
raise
with logging_context(action='consumer'):
while True:
try:
item = await self.queue.get()
self.batch.append(item)
if len(self.batch) > self.batch_size:
await self.process_batch()
except asyncio.CancelledError:
# Try and process the last batch
logger.info(
f"Event handler {self.route_name} received cancellation signal! "
"Trying to process last batch."
)
if self.batch:
await self.process_batch()
raise
except Exception:
logger.exception(
f"Event handler {self.route_name} received unhandled error."
" Ignoring and continuing cautiously."
)
pass
async def process_batch(self):
# TODO: copy syntax might be more efficient here
await self.model.table.insert_many(
self.struct._fields,
*map(tuple, self.batch)
)
self.batch.clear()
with logging_context(action='batch'):
logger.debug("Processing Batch")
# TODO: copy syntax might be more efficient here
await self.model.table.insert_many(
self.struct._fields,
*map(tuple, self.batch)
)
self.batch.clear()
def bind(self, client: AppClient):
"""
@@ -81,14 +106,21 @@ class EventHandler(Generic[T]):
raise ValueError("Not attached, cannot detach!")
self._client.routes.pop(self.route_name, None)
self._route = None
logger.info(
f"EventHandler {self.route_name} has attached to the ShardTalk client."
)
return self
async def attach(self, client: AppClient):
"""
Attach to a ShardTalk client and start listening.
"""
self.bind(client)
self._consumer_task = asyncio.create_task(self.consumer())
with logging_context(action=self.route_name):
self.bind(client)
self._consumer_task = asyncio.create_task(self.consumer())
logger.info(
f"EventHandler {self.route_name} is listening for incoming events."
)
return self
async def detach(self):
@@ -99,6 +131,9 @@ class EventHandler(Generic[T]):
if self._consumer_task and not self._consumer_task.done():
self._consumer_task.cancel()
self._consumer_task = None
logger.info(
f"EventHandler {self.route_name} has detached."
)
return self
@@ -108,14 +143,15 @@ class CommandEvent(NamedTuple):
userid: int
created_at: datetime.datetime
status: CommandStatus
execution_time: int
execution_time: float
error: Optional[str] = None
cogname: Optional[str] = None
guildid: Optional[int] = None
ctxid: Optional[int] = None
command_event_handler: EventHandler[CommandEvent] = EventHandler(
'command_event', AnalyticsData.Commands, CommandEvent
'command_event', AnalyticsData.Commands, CommandEvent, batchsize=1
)
@@ -127,17 +163,18 @@ class GuildEvent(NamedTuple):
guild_event_handler: EventHandler[GuildEvent] = EventHandler(
'guild_event', AnalyticsData.Guilds, GuildEvent
'guild_event', AnalyticsData.Guilds, GuildEvent, batchsize=0
)
class VoiceEvent(NamedTuple):
appname: str
guildid: int
userid: int
action: VoiceAction
created_at: datetime.datetime
voice_event_handler: EventHandler[VoiceEvent] = EventHandler(
'voice_event', AnalyticsData.VoiceSession, VoiceEvent
'voice_event', AnalyticsData.VoiceSession, VoiceEvent, batchsize=5
)

View File

@@ -1,13 +1,17 @@
import asyncio
import logging
from typing import Optional
from meta import conf, appname
from meta.logger import log_context, log_action_stack, logging_context, log_app
from meta.logger import log_context, log_action_stack, logging_context, log_app, log_wrap
from meta.ipc import AppClient
from meta.app import appname_from_shard
from meta.sharding import shard_count
from data import Database
from .events import command_event_handler, guild_event_handler, voice_event_handler
from .snapshot import shard_snapshot, ShardSnapshot
from .data import AnalyticsData
@@ -17,28 +21,108 @@ for name in conf.config.options('LOGGING_LEVELS', no_defaults=True):
logging.getLogger(name).setLevel(conf.logging_levels[name])
db = Database(conf.data['args'])
class AnalyticsServer:
# TODO: Move these to the config
# How often to request snapshots
snap_period = 120
# How soon after a snapshot failure (e.g. not all shards online) to retry
snap_retry_period = 10
def __init__(self) -> None:
self.db = Database(conf.data['args'])
self.data = self.db.load_registry(AnalyticsData())
async def main():
log_action_stack.set(['Analytics'])
log_app.set(conf.analytics['appname'])
self.event_handlers = [
command_event_handler,
guild_event_handler,
voice_event_handler
]
async with await db.connect():
db.load_registry(AnalyticsData())
analytic_talk = AppClient(
self.talk = AppClient(
conf.analytics['appname'],
appname,
{'host': conf.analytics['server_host'], 'port': int(conf.analytics['server_port'])},
{'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])}
)
await analytic_talk.connect()
cmd = await command_event_handler.attach(analytic_talk)
guild = await guild_event_handler.attach(analytic_talk)
voice = await voice_event_handler.attach(analytic_talk)
logger.info("Finished initialising, waiting for events.")
await asyncio.gather(cmd._consumer_task, guild._consumer_task, voice._consumer_task)
self.talk_shard_snapshot = self.talk.register_route()(shard_snapshot)
self._snap_task: Optional[asyncio.Task] = None
async def attach_event_handlers(self):
for handler in self.event_handlers:
await handler.attach(self.talk)
@log_wrap(action='Snap')
async def take_snapshot(self):
# Check if all the shards are registered on shard_talk
expected_peers = [appname_from_shard(i) for i in range(0, shard_count)]
if missing := [peer for peer in expected_peers if peer not in self.talk.peers]:
# We are missing peer(s)!
logger.warning(
f"Analytics could not take snapshot because peers are missing: {', '.join(missing)}"
)
return False
# Everyone is here, ask for shard snapshots
results = await self.talk_shard_snapshot().broadcast()
# Make sure everyone sent results and there were no exceptions (e.g. concurrency)
if not all(result is not None and not isinstance(result, Exception) for result in results.values()):
# This should essentially never happen
# Either some of the shards could not make a snapshot (e.g. Discord client issues)
# or they disconnected in the process.
logger.warning(
f"Analytics could not take snapshot because some peers failed! Partial snapshot: {results}"
)
return False
# Now we have a dictionary of shard snapshots, aggregate, pull in remaining data, and store.
# TODO Possibly move this out into snapshots.py?
aggregate = {field: 0 for field in ShardSnapshot._fields}
for result in results.values():
for field, num in result._asdict().items():
aggregate[field] += num
row = await self.data.Snapshots.create(
appname=appname,
guild_count=aggregate['guild_count'],
member_count=aggregate['member_count'],
user_count=aggregate['user_count'],
in_voice=aggregate['voice_count'],
)
logger.info(f"Created snapshot: {row.data!r}")
return True
@log_wrap(action='SnapLoop')
async def snapshot_loop(self):
while True:
try:
result = await self.take_snapshot()
if result:
await asyncio.sleep(self.snap_period)
else:
logger.info("Snapshot failed, retrying after %d seconds", self.snap_retry_period)
await asyncio.sleep(self.snap_retry_period)
except asyncio.CancelledError:
logger.info("Snapshot loop cancelled, closing.")
return
except Exception:
logger.exception(
"Unhandled exception during snapshot loop. Ignoring and continuing cautiously."
)
await asyncio.sleep(self.snap_retry_period)
async def run(self):
log_action_stack.set(['Analytics'])
log_app.set(conf.analytics['appname'])
async with await self.db.connect():
await self.talk.connect()
await self.attach_event_handlers()
self._snap_task = asyncio.create_task(self.snapshot_loop())
await asyncio.gather(*(handler._consumer_task for handler in self.event_handlers))
if __name__ == '__main__':
asyncio.run(main())
server = AnalyticsServer()
asyncio.run(server.run())

28
bot/analytics/snapshot.py Normal file
View File

@@ -0,0 +1,28 @@
from typing import NamedTuple
from meta.context import ctx_bot
class ShardSnapshot(NamedTuple):
guild_count: int
voice_count: int
member_count: int
user_count: int
async def shard_snapshot():
"""
Take a snapshot of the current shard.
"""
bot = ctx_bot.get()
if bot is None or not bot.is_ready():
# We cannot take a snapshot without Bot
# Just quietly fail
return None
snap = ShardSnapshot(
guild_count=len(bot.guilds),
voice_count=sum(len(channel.members) for guild in bot.guilds for channel in guild.voice_channels),
member_count=sum(len(guild.members) for guild in bot.guilds),
user_count=len(set(m.id for guild in bot.guilds for m in guild.members))
)
return snap

View File

@@ -45,7 +45,7 @@ class CoreCog(LionCog):
shard_count=self.bot.shard_count
)
self.bot.add_listener(self.shard_update_guilds, name='on_guild_join')
self.bot.add_listener(self.shard_update_guilds, name='on_guild_leave')
self.bot.add_listener(self.shard_update_guilds, name='on_guild_remove')
self.bot.core = self

View File

@@ -98,7 +98,8 @@ class LionBot(Bot):
async def on_command(self, ctx: LionContext):
logger.info(
f"Executing command '{ctx.command.qualified_name}' (from module '{ctx.cog.__cog_name__}') "
f"Executing command '{ctx.command.qualified_name}' "
f"(from module '{ctx.cog.qualified_name if ctx.cog else 'None'}') "
f"with arguments {ctx.args} and kwargs {ctx.kwargs}.",
extra={'with_ctx': True}
)
@@ -109,11 +110,15 @@ class LionBot(Bot):
if isinstance(ctx.command, HybridCommand) and ctx.command.app_command:
cmd_str = ctx.command.app_command.to_dict()
try:
raise exception
raise exception from None
except (HybridCommandError, CommandInvokeError, appCommandInvokeError):
original = exception.original
try:
raise original
if isinstance(exception.original, (HybridCommandError, CommandInvokeError, appCommandInvokeError)):
original = exception.original.original
raise original
else:
original = exception.original
raise original
except HandledException:
pass
except SafeCancellation:
@@ -151,7 +156,6 @@ class LionBot(Bot):
except Exception:
logger.exception(
f"Caught an unknown CommandInvokeError while executing: {cmd_str}",
exc_info=exception,
extra={'action': 'BotError', 'with_ctx': True}
)
@@ -183,6 +187,5 @@ class LionBot(Bot):
# Something is very wrong here, don't attempt user interaction.
logger.exception(
f"Caught an unknown top-level exception while executing: {cmd_str}",
exc_info=exception,
extra={'action': 'BotError', 'with_ctx': True}
)

View File

@@ -58,7 +58,8 @@ CREATE TABLE analytics.snapshots(
snapshotid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
guild_count INTEGER NOT NULL,
study_time BIGINT NOT NULL,
member_count INTEGER NOT NULL,
user_count INTEGER NOT NULL,
in_voice INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
@@ -83,7 +84,8 @@ CREATE TABLE analytics.commands(
cogname TEXT,
userid BIGINT NOT NULL,
status analytics.CommandStatus NOT NULL,
execution_time INTEGER NOT NULL
error TEXT,
execution_time REAL NOT NULL
) INHERITS (analytics.events);
@@ -106,9 +108,9 @@ CREATE TYPE analytics.VoiceAction AS ENUM(
CREATE TABLE analytics.voice_sessions(
userid BIGINT NOT NULL,
action analytics.VoiceAction NOT NULL
);
) INHERITS (analytics.events);
CREATE TABLE analytics.gui_renders(
cardname TEXT NOT NULL,
duration INTEGER NOT NULL
);
) INHERITS (analytics.events);

View File

@@ -4,8 +4,9 @@ import asyncio
sys.path.insert(0, os.path.join(os.getcwd(), "bot"))
from bot.analytics.server import main
from bot.analytics.server import AnalyticsServer
if __name__ == '__main__':
asyncio.run(main())
server = AnalyticsServer()
asyncio.run(server.run())