diff --git a/bot/analytics/__init__.py b/bot/analytics/__init__.py new file mode 100644 index 00000000..1217553c --- /dev/null +++ b/bot/analytics/__init__.py @@ -0,0 +1,5 @@ +from .cog import Analytics + + +async def setup(bot): + await bot.add_cog(Analytics(bot)) diff --git a/bot/analytics/cog.py b/bot/analytics/cog.py new file mode 100644 index 00000000..57d060e1 --- /dev/null +++ b/bot/analytics/cog.py @@ -0,0 +1,82 @@ +from meta import LionCog, LionBot, LionContext + +from meta.app import shard_talk, appname +from utils.lib import utc_now + +from .data import AnalyticsData +from .events import ( + CommandStatus, CommandEvent, command_event_handler, + GuildAction, GuildEvent, guild_event_handler, + VoiceAction, VoiceEvent, voice_event_handler +) + + +# TODO: Client side might be better handled as a single connection fed by a queue? +# Maybe consider this again after the interactive REPL idea +# Or if it seems like this is giving an absurd amount of traffic + + +class Analytics(LionCog): + def __init__(self, bot: LionBot): + self.bot = bot + self.data = bot.db.load_registry(AnalyticsData()) + self.an_app = bot.config.analytics['appname'] + + self.talk_command_event = command_event_handler.bind(shard_talk).route + self.talk_guild_event = guild_event_handler.bind(shard_talk).route + self.talk_voice_event = voice_event_handler.bind(shard_talk).route + + async def cog_load(self): + await self.data.init() + + @LionCog.listener() + async def on_guild_join(self, guild): + """ + Send guild join event. + """ + event = GuildEvent( + appname=appname, + guildid=guild.id, + action=GuildAction.JOINED, + created_at=utc_now() + ) + await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) + + @LionCog.listener() + async def on_guild_leave(self, guild): + """ + Send guild leave event + """ + event = GuildEvent( + appname=appname, + guildid=guild.id, + action=GuildAction.LEFT, + created_at=utc_now() + ) + await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False) + + @LionCog.listener() + async def on_command_completion(self, ctx: LionContext): + """ + Send command completed successfully. + """ + event = CommandEvent( + appname=appname, + cmdname=ctx.command.name if ctx.command else 'Unknown', + cogname=ctx.cog.qualified_name if ctx.cog else None, + userid=ctx.author.id, + created_at=utc_now(), + status=CommandStatus.COMPLETED, + execution_time=0, + guildid=ctx.guild.id if ctx.guild else None, + ctxid=ctx.message.id + ) + await self.talk_command_event(event).send(self.an_app, wait_for_reply=False) + + @LionCog.listener() + async def on_command_error(self, ctx: LionContext, error): + """ + Send command failed. + """ + # TODO: Add command error field? + ... diff --git a/bot/analytics/data.py b/bot/analytics/data.py new file mode 100644 index 00000000..2d906d1b --- /dev/null +++ b/bot/analytics/data.py @@ -0,0 +1,186 @@ +from enum import Enum + +from data.registry import Registry +from data.adapted import RegisterEnum +from data.models import RowModel +from data.columns import Integer, String, Timestamp, Column + + +class CommandStatus(Enum): + """ + Schema + ------ + CREATE TYPE analytics.CommandStatus AS ENUM( + 'COMPLETED', + 'CANCELLED' + 'FAILED' + ); + """ + COMPLETED = ('COMPLETED',) + CANCELLED = ('CANCELLED',) + FAILED = ('FAILED',) + + +class GuildAction(Enum): + """ + Schema + ------ + CREATE TYPE analytics.GuildAction AS ENUM( + 'JOINED', + 'LEFT' + ); + """ + JOINED = ('JOINED',) + LEFT = ('LEFT',) + + +class VoiceAction(Enum): + """ + Schema + ------ + CREATE TYPE analytics.VoiceAction AS ENUM( + 'JOINED', + 'LEFT' + ); + """ + JOINED = ('JOINED',) + LEFT = ('LEFT',) + + +class AnalyticsData(Registry, name='analytics'): + CommandStatus = RegisterEnum(CommandStatus, name="analytics.CommandStatus") + GuildAction = RegisterEnum(GuildAction, name="analytics.GuildAction") + VoiceAction = RegisterEnum(VoiceAction, name="analytics.VoiceAction") + + class Snapshots(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.snapshots( + snapshotid SERIAL PRIMARY KEY, + appname TEXT NOT NULL REFERENCES bot_config (appname), + guild_count INTEGER NOT NULL, + study_time BIGINT NOT NULL, + in_voice INTEGER NOT NULL, + _created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') + ); + """ + _schema_ = 'analytics' + _tablename_ = 'snapshots' + + snapshotid = Integer(primary=True) + appname = String() + guild_count = Integer() + in_voice = Integer() + study_time = Integer() + created_at = Timestamp() + + class Events(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.events( + eventid SERIAL PRIMARY KEY, + appname TEXT NOT NULL REFERENCES bot_config (appname), + ctxid BIGINT, + guildid BIGINT, + _created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') + ); + """ + _schema_ = 'analytics' + _tablename_ = 'events' + + eventid = Integer(primary=True) + appname = String() + ctxid = Integer() + guildid = Integer() + created_at = Timestamp() + + class Commands(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.commands( + cmdname TEXT NOT NULL, + cogname TEXT, + userid BIGINT NOT NULL, + status analytics.CommandStatus NOT NULL, + execution_time INTEGER NOT NULL + ) INHERITS (analytics.events); + """ + _schema_ = 'analytics' + _tablename_ = 'commands' + + eventid = Integer(primary=True) + appname = String() + ctxid = Integer() + guildid = Integer() + created_at = Timestamp() + + cmdname = String() + cogname = String() + userid = Integer() + status: Column[CommandStatus] = Column() + execution_time = Integer() + + class Guilds(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.guilds( + guildid BIGINT NOT NULL, + action analytics.GuildAction NOT NULL + ) INHERITS (analytics.events); + """ + _schema_ = 'analytics' + _tablename_ = 'guilds' + + eventid = Integer(primary=True) + appname = String() + ctxid = Integer() + guildid = Integer() + created_at = Timestamp() + + action: Column[GuildAction] = Column() + + class VoiceSession(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.voice_sessions( + userid BIGINT NOT NULL, + action analytics.VoiceAction NOT NULL + ); + """ + _schema_ = 'analytics' + _tablename_ = 'voice_sessions' + + eventid = Integer(primary=True) + appname = String() + ctxid = Integer() + guildid = Integer() + created_at = Timestamp() + + userid = Integer() + action: Column[GuildAction] = Column() + + class GuiRender(RowModel): + """ + Schema + ------ + CREATE TABLE analytics.gui_renders( + cardname TEXT NOT NULL, + duration INTEGER NOT NULL + ); + """ + _schema_ = 'analytics' + _tablename_ = 'gui_renders' + + eventid = Integer(primary=True) + appname = String() + ctxid = Integer() + guildid = Integer() + created_at = Timestamp() + + cardname = String() + duration = Integer() diff --git a/bot/analytics/events.py b/bot/analytics/events.py new file mode 100644 index 00000000..dbd8e4fb --- /dev/null +++ b/bot/analytics/events.py @@ -0,0 +1,143 @@ +import asyncio +import datetime +from collections import namedtuple +from typing import NamedTuple, Optional, Generic, Type, TypeVar + +from meta.ipc import AppRoute, AppClient +from data import RowModel +from .data import AnalyticsData, CommandStatus, VoiceAction, GuildAction + + +""" +TODO +Snapshot type? Incremental or manual? +Request snapshot route will require all shards to be online +""" + +T = TypeVar('T') + + +class EventHandler(Generic[T]): + batch_size = 1 + + def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T]): + self.model = model + self.struct = struct + + self.route_name = route_name + self._route: Optional[AppRoute] = None + self._client: Optional[AppClient] = None + + self.queue: asyncio.Queue[T] = asyncio.Queue() + self.batch: list[T] = [] + self._consumer_task: Optional[asyncio.Task] = None + + @property + def route(self): + if self._route is None: + self._route = AppRoute(self.handle_event, name=self.route_name) + return self._route + + async def handle_event(self, data): + await self.queue.put(data) + + async def consumer(self): + while True: + try: + item = await self.queue.get() + self.batch.append(item) + if len(self.batch) > self.batch_size: + await self.process_batch() + except asyncio.CancelledError: + raise + except asyncio.TimeoutError: + raise + + async def process_batch(self): + # TODO: copy syntax might be more efficient here + await self.model.table.insert_many( + self.struct._fields, + *map(tuple, self.batch) + ) + self.batch.clear() + + def bind(self, client: AppClient): + """ + Bind our route to the given client. + """ + if self._client: + raise ValueError("This EventHandler is already attached!") + + self._client = client + self.route._client = client + client.routes[self.route_name] = self.route + return self + + def unbind(self): + """ + Unbind from the client. + """ + if not self._client: + raise ValueError("Not attached, cannot detach!") + self._client.routes.pop(self.route_name, None) + self._route = None + return self + + async def attach(self, client: AppClient): + """ + Attach to a ShardTalk client and start listening. + """ + self.bind(client) + self._consumer_task = asyncio.create_task(self.consumer()) + return self + + async def detach(self): + """ + Stop listening and detach from client. + """ + self.unbind() + if self._consumer_task and not self._consumer_task.done(): + self._consumer_task.cancel() + self._consumer_task = None + return self + + +class CommandEvent(NamedTuple): + appname: str + cmdname: str + userid: int + created_at: datetime.datetime + status: CommandStatus + execution_time: int + cogname: Optional[str] = None + guildid: Optional[int] = None + ctxid: Optional[int] = None + + +command_event_handler: EventHandler[CommandEvent] = EventHandler( + 'command_event', AnalyticsData.Commands, CommandEvent +) + + +class GuildEvent(NamedTuple): + appname: str + guildid: int + action: GuildAction + created_at: datetime.datetime + + +guild_event_handler: EventHandler[GuildEvent] = EventHandler( + 'guild_event', AnalyticsData.Guilds, GuildEvent +) + + +class VoiceEvent(NamedTuple): + appname: str + guildid: int + action: VoiceAction + created_at: datetime.datetime + + +voice_event_handler: EventHandler[VoiceEvent] = EventHandler( + 'voice_event', AnalyticsData.VoiceSession, VoiceEvent +) diff --git a/bot/analytics/server.py b/bot/analytics/server.py new file mode 100644 index 00000000..5356cd02 --- /dev/null +++ b/bot/analytics/server.py @@ -0,0 +1,44 @@ +import asyncio +import logging + +from meta import conf, appname +from meta.logger import log_context, log_action_stack, logging_context, log_app +from meta.ipc import AppClient + +from data import Database + +from .events import command_event_handler, guild_event_handler, voice_event_handler +from .data import AnalyticsData + + +logger = logging.getLogger(__name__) + +for name in conf.config.options('LOGGING_LEVELS', no_defaults=True): + logging.getLogger(name).setLevel(conf.logging_levels[name]) + + +db = Database(conf.data['args']) + + +async def main(): + log_action_stack.set(['Analytics']) + log_app.set(conf.analytics['appname']) + + async with await db.connect(): + db.load_registry(AnalyticsData()) + analytic_talk = AppClient( + conf.analytics['appname'], + appname, + {'host': conf.analytics['server_host'], 'port': int(conf.analytics['server_port'])}, + {'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])} + ) + await analytic_talk.connect() + cmd = await command_event_handler.attach(analytic_talk) + guild = await guild_event_handler.attach(analytic_talk) + voice = await voice_event_handler.attach(analytic_talk) + logger.info("Finished initialising, waiting for events.") + await asyncio.gather(cmd._consumer_task, guild._consumer_task, voice._consumer_task) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/bot/main.py b/bot/main.py index d0d832c3..77b764f1 100644 --- a/bot/main.py +++ b/bot/main.py @@ -46,7 +46,7 @@ async def main(): shardname=shardname, db=db, config=conf, - initial_extensions=['core', 'modules'], + initial_extensions=['core', 'analytics', 'modules'], web_client=session, app_ipc=shard_talk, testing_guilds=conf.bot.getintlist('admin_guilds'), diff --git a/bot/meta/app.py b/bot/meta/app.py index aa84a3bb..8ff8caf2 100644 --- a/bot/meta/app.py +++ b/bot/meta/app.py @@ -35,6 +35,7 @@ log_app.set(shardname) shard_talk = AppClient( shardname, + appname, {'host': args.host, 'port': args.port}, {'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])} ) diff --git a/bot/meta/ipc/__init__.py b/bot/meta/ipc/__init__.py index e69de29b..1716f8de 100644 --- a/bot/meta/ipc/__init__.py +++ b/bot/meta/ipc/__init__.py @@ -0,0 +1,2 @@ +from .client import AppClient, AppPayload, AppRoute +from .server import AppServer diff --git a/bot/meta/ipc/client.py b/bot/meta/ipc/client.py index a4d173ed..82fca1ad 100644 --- a/bot/meta/ipc/client.py +++ b/bot/meta/ipc/client.py @@ -15,8 +15,9 @@ Address: TypeAlias = dict[str, Any] class AppClient: routes: dict[str, 'AppRoute'] = {} # route_name -> Callable[Any, Awaitable[Any]] - def __init__(self, appid: str, client_address: Address, server_address: Address): - self.appid = appid + def __init__(self, appid: str, basename: str, client_address: Address, server_address: Address): + self.appid = appid # String identifier for this ShardTalk client + self.basename = basename # Prefix used to recognise app peers self.address = client_address self.server_address = server_address @@ -30,6 +31,10 @@ class AppClient: self.register_route('drop_peer')(self.drop_peer) self.register_route('peer_list')(self.peer_list) + @property + def my_peers(self): + return {peerid: peer for peerid, peer in self.peers.items() if peerid.startswith(self.basename)} + def register_route(self, name=None): def wrapper(coro): route = AppRoute(coro, client=self, name=name) @@ -94,7 +99,7 @@ class AppClient: # TODO ... - async def request(self, appid, payload: 'AppPayload'): + async def request(self, appid, payload: 'AppPayload', wait_for_reply=True): with logging_context(action=f"Req {appid}"): try: if appid not in self.peers: @@ -107,18 +112,22 @@ class AppClient: writer.write(payload.encoded()) await writer.drain() writer.write_eof() - result = await reader.read() - writer.close() - decoded = payload.route.decode(result) - return decoded + if wait_for_reply: + result = await reader.read() + writer.close() + decoded = payload.route.decode(result) + return decoded + else: + return None except Exception: logging.exception(f"Failed to send request to {appid}'") return None - async def requestall(self, payload, except_self=True): + async def requestall(self, payload, except_self=True, only_my_peers=True): with logging_context(action="Broadcast"): + peerlist = self.my_peers if only_my_peers else self.peers results = await asyncio.gather( - *(self.request(appid, payload) for appid in self.peers if (appid != self.appid or not except_self)), + *(self.request(appid, payload) for appid in peerlist if (appid != self.appid or not except_self)), return_exceptions=True ) return dict(zip(self.peers.keys(), results)) diff --git a/data/migration/v12-13/migration.sql b/data/migration/v12-13/migration.sql index 1b65bf62..f231aaf8 100644 --- a/data/migration/v12-13/migration.sql +++ b/data/migration/v12-13/migration.sql @@ -8,6 +8,7 @@ ALTER TABLE guild_config ADD COLUMN first_joined_at TIMESTAMPTZ; ALTER TABLE guild_config ADD COLUMN left_at TIMESTAMPTZ; +-- Bot config data CREATE TABLE app_config( appname TEXT PRIMARY KEY, created_at TIMESTAMPTZ NOT NULL DEFAULT now() @@ -19,7 +20,7 @@ CREATE TABLE bot_config( ); CREATE TABLE shard_data( - shard_name TEXT PRIMARY KEY, + shardname TEXT PRIMARY KEY, appname TEXT REFERENCES bot_config(appname) ON DELETE CASCADE, shard_id INTEGER NOT NULL, shard_count INTEGER NOT NULL, @@ -47,3 +48,67 @@ CREATE TABLE bot_config_presence( activity_type ActivityType, activity_name Text ); + + +-- Analytics data +-- DROP SCHEMA IF EXISTS "analytics" CASCADE; +CREATE SCHEMA "analytics"; + +CREATE TABLE analytics.snapshots( + snapshotid SERIAL PRIMARY KEY, + appname TEXT NOT NULL REFERENCES bot_config (appname), + guild_count INTEGER NOT NULL, + study_time BIGINT NOT NULL, + in_voice INTEGER NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') +); + + +CREATE TABLE analytics.events( + eventid SERIAL PRIMARY KEY, + appname TEXT NOT NULL REFERENCES bot_config (appname), + ctxid BIGINT, + guildid BIGINT, + created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc') +); + +CREATE TYPE analytics.CommandStatus AS ENUM( + 'COMPLETED', + 'CANCELLED' + 'FAILED' +); + +CREATE TABLE analytics.commands( + cmdname TEXT NOT NULL, + cogname TEXT, + userid BIGINT NOT NULL, + status analytics.CommandStatus NOT NULL, + execution_time INTEGER NOT NULL +) INHERITS (analytics.events); + + +CREATE TYPE analytics.GuildAction AS ENUM( + 'JOINED', + 'LEFT' +); + +CREATE TABLE analytics.guilds( + guildid BIGINT NOT NULL, + action analytics.GuildAction NOT NULL +) INHERITS (analytics.events); + + +CREATE TYPE analytics.VoiceAction AS ENUM( + 'JOINED', + 'LEFT' +); + +CREATE TABLE analytics.voice_sessions( + userid BIGINT NOT NULL, + action analytics.VoiceAction NOT NULL +); + +CREATE TABLE analytics.gui_renders( + cardname TEXT NOT NULL, + duration INTEGER NOT NULL +); diff --git a/run_analytics.py b/run_analytics.py new file mode 100755 index 00000000..e2f5a4f5 --- /dev/null +++ b/run_analytics.py @@ -0,0 +1,11 @@ +import sys +import os +import asyncio + +sys.path.insert(0, os.path.join(os.getcwd(), "bot")) + +from bot.analytics.server import main + + +if __name__ == '__main__': + asyncio.run(main())