from typing import Optional, TypeAlias, TypedDict import json from collections import defaultdict from datetime import datetime, timedelta from dataclasses import dataclass from data.queries import JOINTYPE, ORDER from meta.sockets import Channel from utils.lib import utc_now from modules.profiles.profiles.profiles import ProfilesRegistry from . import logger from .data import ( SubathonData, Subathon, RunningSubathon, SubathonContribution, SubathonGoal, ) from .subathon import ActiveSubathon, SubathonRegistry # ISO formatted timestamp ISOTimestamp: TypeAlias = str class GoalPayload(TypedDict): required: float name: str class ContributionPayload(TypedDict): user_name: str user_id: str | None amount: float seconds_added: int timestamp: ISOTimestamp class LeaderboardItemPayload(TypedDict): user_name: str user_id: str | None amount: float class ScoreTablePayload(TypedDict): bit_score: float # points per bit t1_score: float # points per T1 sub t2_score: float # points per T2 sub t3_score: float # points per T3 sub score_time: float # seconds added per point class SubathonPayload(TypedDict): end_at: ISOTimestamp is_running: bool score_table: ScoreTablePayload name: str total_contribution: float total_subpoints: int total_bits: int goals_met: int goals_total: int last_goal: GoalPayload | None next_goal: GoalPayload | None goals: list[GoalPayload] last_contribution: ContributionPayload | None leaderboard: list[LeaderboardItemPayload] async def prepare_subathon( profiler: ProfilesRegistry, subathon: ActiveSubathon ) -> SubathonPayload: total_score = await subathon.get_score() # Build the goal data goals = await subathon.get_goals() goals_met = 0 last_goalp = None next_goalp = None goalps = [] total_goals = len(goals) for goal in goals: goalp = GoalPayload( required=float(goal.required_score), name=goal.description, ) goalps.append(goalp) if goal.required_score <= total_score: last_goalp = goalp goals_met += 1 elif next_goalp is None: next_goalp = goalp # Build the last contribution information contribs = await subathon.fetch_contributions().limit(1) last_contrib: ContributionPayload | None = None if contribs: contrib = contribs[0] if contrib.profileid: profile = await profiler.get_profile(contrib.profileid) assert profile is not None name = profile.nickname or "Unknown" user_id = str(profile.profileid) else: name = "Anonymous" user_id = None last_contrib = ContributionPayload( user_name=name, user_id=user_id, amount=float(contrib.score), seconds_added=subathon.get_score_time(contrib.score), timestamp=contrib.created_at.isoformat(), ) # Build the score table score_table = ScoreTablePayload( bit_score=float(subathon.subathondata.bit_score), t1_score=float(subathon.subathondata.sub1_score), t2_score=float(subathon.subathondata.sub2_score), t3_score=float(subathon.subathondata.sub3_score), score_time=int(subathon.subathondata.score_time), ) # Build the contribution leaderboard query = SubathonContribution.table.select_where( subathon_id=subathon.subathondata.subathon_id ) query.join("user_profiles", using=("profileid",), join_type=JOINTYPE.LEFT) query.select("subathon_id", "profileid", total="SUM(score)") query.order_by("total", direction=ORDER.DESC) query.group_by("subathon_id", "profileid") query.with_no_adapter() results = await query leaderboard = [] for row in results: if pid := row["profileid"]: profile = await profiler.get_profile(pid) name = (profile.nickname if profile else None) or "Unknown" else: name = "Anonymous" score = row["total"] item = LeaderboardItemPayload( user_name=name, user_id=pid, amount=float(score), ) leaderboard.append(item) # Build the raw totals subpoints = await subathon.get_total_subs() bits = await subathon.get_total_bits() # Finally, put together the payload payload = SubathonPayload( name=subathon.subathondata.name, end_at=(await subathon.get_ending()).isoformat(), is_running=(subathon.running), score_table=score_table, total_contribution=float(await subathon.get_score()), total_subpoints=subpoints, total_bits=bits, goals_met=goals_met, goals_total=total_goals, last_goal=last_goalp, next_goal=next_goalp, goals=goalps, last_contribution=last_contrib, leaderboard=leaderboard, ) return payload class TimerChannel(Channel): name = "SubTimer" def __init__( self, profiler: ProfilesRegistry, subathons: SubathonRegistry, **kwargs ): super().__init__(**kwargs) self.profiler: ProfilesRegistry = profiler self.subathons: SubathonRegistry = subathons # Map of communities to webhooks listening for this community self.communities = defaultdict( set ) # Map of communityid -> listening websockets async def on_connection(self, websocket, event): if not (cidstr := event.get("community")): raise ValueError("Subathon timer connection missing communityid") elif not cidstr.isdigit(): raise ValueError("Community id provided is not an integer") cid = int(cidstr) community = await self.profiler.get_community(cid) if community is None: raise ValueError("Unknown community provided.") await super().on_connection(websocket, event) self.communities[cid].add(websocket) subathon = await self.subathons.get_active_subathon(cid) if subathon: payload = await prepare_subathon(self.profiler, subathon) await self.send_subathon_update(cid, payload, websocket) else: await self.send_no_subathon(cid, websocket) async def del_connection(self, websocket): for wss in self.communities.values(): wss.discard(websocket) await super().del_connection(websocket) async def send_subathon_update( self, communityid: int, payload: SubathonPayload, websocket=None ): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { "type": "DO", "method": "setTimer", "args": payload, }, websocket=ws, ) async def send_subathon_ended( self, communityid: int, payload: SubathonPayload, websocket=None ): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { "type": "DO", "method": "endTimer", "args": payload, }, websocket=ws, ) async def send_no_subathon(self, communityid: int, websocket=None): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { "type": "DO", "method": "noTimer", "args": {}, }, websocket=ws, ) async def send_event(self, event, **kwargs): logger.info(f"Sending websocket event: {json.dumps(event, indent=1)}") await super().send_event(event, **kwargs)