rewrite: Analytics client-server model.

This commit is contained in:
2022-11-18 18:53:02 +02:00
parent b471e78a75
commit fe51a8d150
11 changed files with 559 additions and 11 deletions

View File

@@ -0,0 +1,5 @@
from .cog import Analytics
async def setup(bot):
await bot.add_cog(Analytics(bot))

82
bot/analytics/cog.py Normal file
View File

@@ -0,0 +1,82 @@
from meta import LionCog, LionBot, LionContext
from meta.app import shard_talk, appname
from utils.lib import utc_now
from .data import AnalyticsData
from .events import (
CommandStatus, CommandEvent, command_event_handler,
GuildAction, GuildEvent, guild_event_handler,
VoiceAction, VoiceEvent, voice_event_handler
)
# TODO: Client side might be better handled as a single connection fed by a queue?
# Maybe consider this again after the interactive REPL idea
# Or if it seems like this is giving an absurd amount of traffic
class Analytics(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(AnalyticsData())
self.an_app = bot.config.analytics['appname']
self.talk_command_event = command_event_handler.bind(shard_talk).route
self.talk_guild_event = guild_event_handler.bind(shard_talk).route
self.talk_voice_event = voice_event_handler.bind(shard_talk).route
async def cog_load(self):
await self.data.init()
@LionCog.listener()
async def on_guild_join(self, guild):
"""
Send guild join event.
"""
event = GuildEvent(
appname=appname,
guildid=guild.id,
action=GuildAction.JOINED,
created_at=utc_now()
)
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
async def on_guild_leave(self, guild):
"""
Send guild leave event
"""
event = GuildEvent(
appname=appname,
guildid=guild.id,
action=GuildAction.LEFT,
created_at=utc_now()
)
await self.talk_guild_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
async def on_command_completion(self, ctx: LionContext):
"""
Send command completed successfully.
"""
event = CommandEvent(
appname=appname,
cmdname=ctx.command.name if ctx.command else 'Unknown',
cogname=ctx.cog.qualified_name if ctx.cog else None,
userid=ctx.author.id,
created_at=utc_now(),
status=CommandStatus.COMPLETED,
execution_time=0,
guildid=ctx.guild.id if ctx.guild else None,
ctxid=ctx.message.id
)
await self.talk_command_event(event).send(self.an_app, wait_for_reply=False)
@LionCog.listener()
async def on_command_error(self, ctx: LionContext, error):
"""
Send command failed.
"""
# TODO: Add command error field?
...

186
bot/analytics/data.py Normal file
View File

@@ -0,0 +1,186 @@
from enum import Enum
from data.registry import Registry
from data.adapted import RegisterEnum
from data.models import RowModel
from data.columns import Integer, String, Timestamp, Column
class CommandStatus(Enum):
"""
Schema
------
CREATE TYPE analytics.CommandStatus AS ENUM(
'COMPLETED',
'CANCELLED'
'FAILED'
);
"""
COMPLETED = ('COMPLETED',)
CANCELLED = ('CANCELLED',)
FAILED = ('FAILED',)
class GuildAction(Enum):
"""
Schema
------
CREATE TYPE analytics.GuildAction AS ENUM(
'JOINED',
'LEFT'
);
"""
JOINED = ('JOINED',)
LEFT = ('LEFT',)
class VoiceAction(Enum):
"""
Schema
------
CREATE TYPE analytics.VoiceAction AS ENUM(
'JOINED',
'LEFT'
);
"""
JOINED = ('JOINED',)
LEFT = ('LEFT',)
class AnalyticsData(Registry, name='analytics'):
CommandStatus = RegisterEnum(CommandStatus, name="analytics.CommandStatus")
GuildAction = RegisterEnum(GuildAction, name="analytics.GuildAction")
VoiceAction = RegisterEnum(VoiceAction, name="analytics.VoiceAction")
class Snapshots(RowModel):
"""
Schema
------
CREATE TABLE analytics.snapshots(
snapshotid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
guild_count INTEGER NOT NULL,
study_time BIGINT NOT NULL,
in_voice INTEGER NOT NULL,
_created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
"""
_schema_ = 'analytics'
_tablename_ = 'snapshots'
snapshotid = Integer(primary=True)
appname = String()
guild_count = Integer()
in_voice = Integer()
study_time = Integer()
created_at = Timestamp()
class Events(RowModel):
"""
Schema
------
CREATE TABLE analytics.events(
eventid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
ctxid BIGINT,
guildid BIGINT,
_created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
"""
_schema_ = 'analytics'
_tablename_ = 'events'
eventid = Integer(primary=True)
appname = String()
ctxid = Integer()
guildid = Integer()
created_at = Timestamp()
class Commands(RowModel):
"""
Schema
------
CREATE TABLE analytics.commands(
cmdname TEXT NOT NULL,
cogname TEXT,
userid BIGINT NOT NULL,
status analytics.CommandStatus NOT NULL,
execution_time INTEGER NOT NULL
) INHERITS (analytics.events);
"""
_schema_ = 'analytics'
_tablename_ = 'commands'
eventid = Integer(primary=True)
appname = String()
ctxid = Integer()
guildid = Integer()
created_at = Timestamp()
cmdname = String()
cogname = String()
userid = Integer()
status: Column[CommandStatus] = Column()
execution_time = Integer()
class Guilds(RowModel):
"""
Schema
------
CREATE TABLE analytics.guilds(
guildid BIGINT NOT NULL,
action analytics.GuildAction NOT NULL
) INHERITS (analytics.events);
"""
_schema_ = 'analytics'
_tablename_ = 'guilds'
eventid = Integer(primary=True)
appname = String()
ctxid = Integer()
guildid = Integer()
created_at = Timestamp()
action: Column[GuildAction] = Column()
class VoiceSession(RowModel):
"""
Schema
------
CREATE TABLE analytics.voice_sessions(
userid BIGINT NOT NULL,
action analytics.VoiceAction NOT NULL
);
"""
_schema_ = 'analytics'
_tablename_ = 'voice_sessions'
eventid = Integer(primary=True)
appname = String()
ctxid = Integer()
guildid = Integer()
created_at = Timestamp()
userid = Integer()
action: Column[GuildAction] = Column()
class GuiRender(RowModel):
"""
Schema
------
CREATE TABLE analytics.gui_renders(
cardname TEXT NOT NULL,
duration INTEGER NOT NULL
);
"""
_schema_ = 'analytics'
_tablename_ = 'gui_renders'
eventid = Integer(primary=True)
appname = String()
ctxid = Integer()
guildid = Integer()
created_at = Timestamp()
cardname = String()
duration = Integer()

143
bot/analytics/events.py Normal file
View File

@@ -0,0 +1,143 @@
import asyncio
import datetime
from collections import namedtuple
from typing import NamedTuple, Optional, Generic, Type, TypeVar
from meta.ipc import AppRoute, AppClient
from data import RowModel
from .data import AnalyticsData, CommandStatus, VoiceAction, GuildAction
"""
TODO
Snapshot type? Incremental or manual?
Request snapshot route will require all shards to be online
"""
T = TypeVar('T')
class EventHandler(Generic[T]):
batch_size = 1
def __init__(self, route_name: str, model: Type[RowModel], struct: Type[T]):
self.model = model
self.struct = struct
self.route_name = route_name
self._route: Optional[AppRoute] = None
self._client: Optional[AppClient] = None
self.queue: asyncio.Queue[T] = asyncio.Queue()
self.batch: list[T] = []
self._consumer_task: Optional[asyncio.Task] = None
@property
def route(self):
if self._route is None:
self._route = AppRoute(self.handle_event, name=self.route_name)
return self._route
async def handle_event(self, data):
await self.queue.put(data)
async def consumer(self):
while True:
try:
item = await self.queue.get()
self.batch.append(item)
if len(self.batch) > self.batch_size:
await self.process_batch()
except asyncio.CancelledError:
raise
except asyncio.TimeoutError:
raise
async def process_batch(self):
# TODO: copy syntax might be more efficient here
await self.model.table.insert_many(
self.struct._fields,
*map(tuple, self.batch)
)
self.batch.clear()
def bind(self, client: AppClient):
"""
Bind our route to the given client.
"""
if self._client:
raise ValueError("This EventHandler is already attached!")
self._client = client
self.route._client = client
client.routes[self.route_name] = self.route
return self
def unbind(self):
"""
Unbind from the client.
"""
if not self._client:
raise ValueError("Not attached, cannot detach!")
self._client.routes.pop(self.route_name, None)
self._route = None
return self
async def attach(self, client: AppClient):
"""
Attach to a ShardTalk client and start listening.
"""
self.bind(client)
self._consumer_task = asyncio.create_task(self.consumer())
return self
async def detach(self):
"""
Stop listening and detach from client.
"""
self.unbind()
if self._consumer_task and not self._consumer_task.done():
self._consumer_task.cancel()
self._consumer_task = None
return self
class CommandEvent(NamedTuple):
appname: str
cmdname: str
userid: int
created_at: datetime.datetime
status: CommandStatus
execution_time: int
cogname: Optional[str] = None
guildid: Optional[int] = None
ctxid: Optional[int] = None
command_event_handler: EventHandler[CommandEvent] = EventHandler(
'command_event', AnalyticsData.Commands, CommandEvent
)
class GuildEvent(NamedTuple):
appname: str
guildid: int
action: GuildAction
created_at: datetime.datetime
guild_event_handler: EventHandler[GuildEvent] = EventHandler(
'guild_event', AnalyticsData.Guilds, GuildEvent
)
class VoiceEvent(NamedTuple):
appname: str
guildid: int
action: VoiceAction
created_at: datetime.datetime
voice_event_handler: EventHandler[VoiceEvent] = EventHandler(
'voice_event', AnalyticsData.VoiceSession, VoiceEvent
)

44
bot/analytics/server.py Normal file
View File

@@ -0,0 +1,44 @@
import asyncio
import logging
from meta import conf, appname
from meta.logger import log_context, log_action_stack, logging_context, log_app
from meta.ipc import AppClient
from data import Database
from .events import command_event_handler, guild_event_handler, voice_event_handler
from .data import AnalyticsData
logger = logging.getLogger(__name__)
for name in conf.config.options('LOGGING_LEVELS', no_defaults=True):
logging.getLogger(name).setLevel(conf.logging_levels[name])
db = Database(conf.data['args'])
async def main():
log_action_stack.set(['Analytics'])
log_app.set(conf.analytics['appname'])
async with await db.connect():
db.load_registry(AnalyticsData())
analytic_talk = AppClient(
conf.analytics['appname'],
appname,
{'host': conf.analytics['server_host'], 'port': int(conf.analytics['server_port'])},
{'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])}
)
await analytic_talk.connect()
cmd = await command_event_handler.attach(analytic_talk)
guild = await guild_event_handler.attach(analytic_talk)
voice = await voice_event_handler.attach(analytic_talk)
logger.info("Finished initialising, waiting for events.")
await asyncio.gather(cmd._consumer_task, guild._consumer_task, voice._consumer_task)
if __name__ == '__main__':
asyncio.run(main())

View File

@@ -46,7 +46,7 @@ async def main():
shardname=shardname,
db=db,
config=conf,
initial_extensions=['core', 'modules'],
initial_extensions=['core', 'analytics', 'modules'],
web_client=session,
app_ipc=shard_talk,
testing_guilds=conf.bot.getintlist('admin_guilds'),

View File

@@ -35,6 +35,7 @@ log_app.set(shardname)
shard_talk = AppClient(
shardname,
appname,
{'host': args.host, 'port': args.port},
{'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])}
)

View File

@@ -0,0 +1,2 @@
from .client import AppClient, AppPayload, AppRoute
from .server import AppServer

View File

@@ -15,8 +15,9 @@ Address: TypeAlias = dict[str, Any]
class AppClient:
routes: dict[str, 'AppRoute'] = {} # route_name -> Callable[Any, Awaitable[Any]]
def __init__(self, appid: str, client_address: Address, server_address: Address):
self.appid = appid
def __init__(self, appid: str, basename: str, client_address: Address, server_address: Address):
self.appid = appid # String identifier for this ShardTalk client
self.basename = basename # Prefix used to recognise app peers
self.address = client_address
self.server_address = server_address
@@ -30,6 +31,10 @@ class AppClient:
self.register_route('drop_peer')(self.drop_peer)
self.register_route('peer_list')(self.peer_list)
@property
def my_peers(self):
return {peerid: peer for peerid, peer in self.peers.items() if peerid.startswith(self.basename)}
def register_route(self, name=None):
def wrapper(coro):
route = AppRoute(coro, client=self, name=name)
@@ -94,7 +99,7 @@ class AppClient:
# TODO
...
async def request(self, appid, payload: 'AppPayload'):
async def request(self, appid, payload: 'AppPayload', wait_for_reply=True):
with logging_context(action=f"Req {appid}"):
try:
if appid not in self.peers:
@@ -107,18 +112,22 @@ class AppClient:
writer.write(payload.encoded())
await writer.drain()
writer.write_eof()
result = await reader.read()
writer.close()
decoded = payload.route.decode(result)
return decoded
if wait_for_reply:
result = await reader.read()
writer.close()
decoded = payload.route.decode(result)
return decoded
else:
return None
except Exception:
logging.exception(f"Failed to send request to {appid}'")
return None
async def requestall(self, payload, except_self=True):
async def requestall(self, payload, except_self=True, only_my_peers=True):
with logging_context(action="Broadcast"):
peerlist = self.my_peers if only_my_peers else self.peers
results = await asyncio.gather(
*(self.request(appid, payload) for appid in self.peers if (appid != self.appid or not except_self)),
*(self.request(appid, payload) for appid in peerlist if (appid != self.appid or not except_self)),
return_exceptions=True
)
return dict(zip(self.peers.keys(), results))

View File

@@ -8,6 +8,7 @@ ALTER TABLE guild_config ADD COLUMN first_joined_at TIMESTAMPTZ;
ALTER TABLE guild_config ADD COLUMN left_at TIMESTAMPTZ;
-- Bot config data
CREATE TABLE app_config(
appname TEXT PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
@@ -19,7 +20,7 @@ CREATE TABLE bot_config(
);
CREATE TABLE shard_data(
shard_name TEXT PRIMARY KEY,
shardname TEXT PRIMARY KEY,
appname TEXT REFERENCES bot_config(appname) ON DELETE CASCADE,
shard_id INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
@@ -47,3 +48,67 @@ CREATE TABLE bot_config_presence(
activity_type ActivityType,
activity_name Text
);
-- Analytics data
-- DROP SCHEMA IF EXISTS "analytics" CASCADE;
CREATE SCHEMA "analytics";
CREATE TABLE analytics.snapshots(
snapshotid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
guild_count INTEGER NOT NULL,
study_time BIGINT NOT NULL,
in_voice INTEGER NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
CREATE TABLE analytics.events(
eventid SERIAL PRIMARY KEY,
appname TEXT NOT NULL REFERENCES bot_config (appname),
ctxid BIGINT,
guildid BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT (now() at time zone 'utc')
);
CREATE TYPE analytics.CommandStatus AS ENUM(
'COMPLETED',
'CANCELLED'
'FAILED'
);
CREATE TABLE analytics.commands(
cmdname TEXT NOT NULL,
cogname TEXT,
userid BIGINT NOT NULL,
status analytics.CommandStatus NOT NULL,
execution_time INTEGER NOT NULL
) INHERITS (analytics.events);
CREATE TYPE analytics.GuildAction AS ENUM(
'JOINED',
'LEFT'
);
CREATE TABLE analytics.guilds(
guildid BIGINT NOT NULL,
action analytics.GuildAction NOT NULL
) INHERITS (analytics.events);
CREATE TYPE analytics.VoiceAction AS ENUM(
'JOINED',
'LEFT'
);
CREATE TABLE analytics.voice_sessions(
userid BIGINT NOT NULL,
action analytics.VoiceAction NOT NULL
);
CREATE TABLE analytics.gui_renders(
cardname TEXT NOT NULL,
duration INTEGER NOT NULL
);

11
run_analytics.py Executable file
View File

@@ -0,0 +1,11 @@
import sys
import os
import asyncio
sys.path.insert(0, os.path.join(os.getcwd(), "bot"))
from bot.analytics.server import main
if __name__ == '__main__':
asyncio.run(main())