from typing import Optional, TypeAlias, TypedDict from collections import defaultdict from datetime import datetime, timedelta from dataclasses import dataclass from meta.sockets import Channel from utils.lib import utc_now from modules.profiles.profiles.profiles import ProfilesRegistry from . import logger from .data import SubathonData, Subathon, RunningSubathon, SubathonContribution, SubathonGoal from .subathon import ActiveSubathon, SubathonRegistry # ISO formatted timestamp ISOTimestamp: TypeAlias = str class GoalPayload(TypedDict): required: float name: str class ContributionPayload(TypedDict): user_name: str user_id: str | None amount: float seconds_added: int timestamp: ISOTimestamp class ScoreTablePayload(TypedDict): bit_score: float # points per bit t1_score: float # points per T1 sub t2_score: float # points per T2 sub t3_score: float # points per T3 sub score_time: float # seconds added per point class SubathonPayload(TypedDict): end_at: ISOTimestamp is_running: bool score_table: ScoreTablePayload name: str total_contribution: float goals_met: int goals_total: int last_goal: GoalPayload | None next_goal: GoalPayload | None last_contribution: ContributionPayload | None async def prepare_subathon(profiler: ProfilesRegistry, subathon: ActiveSubathon) -> SubathonPayload: now = utc_now() total_score = await subathon.get_score() goals = await subathon.get_goals() goals_met = 0 last_goal = None next_goal = None total_goals = len(goals) for goal in goals: if goal.required_score >= total_score: last_goal = goal goals_met += 1 else: next_goal = goal break contribs = await subathon.fetch_contributions().limit(1) last_contrib: ContributionPayload | None = None if contribs: contrib = contribs[0] if contrib.profileid: profile = await profiler.get_profile(contrib.profileid) assert profile is not None name = profile.nickname user_id = None else: name = 'Anonymous' user_id = None last_contrib = ContributionPayload( user_name=name, user_id=user_id, amount=float(contrib.score), seconds_added=subathon.get_score_time(contrib.score), timestamp=contrib.created_at.isoformat() ) score_table = ScoreTablePayload( bit_score=float(subathon.subathondata.bit_score), t1_score=float(subathon.subathondata.sub1_score), t2_score=float(subathon.subathondata.sub2_score), t3_score=float(subathon.subathondata.sub3_score), score_time=int(subathon.subathondata.score_time), ) payload = SubathonPayload( name=subathon.subathondata.name, end_at=(await subathon.get_ending()).isoformat(), is_running=(subathon.running), score_table=score_table, total_contribution=float(await subathon.get_score()), goals_met=goals_met, goals_total=total_goals, last_goal=GoalPayload( required=float(last_goal.required_score), name=last_goal.description, ) if last_goal is not None else None, next_goal=GoalPayload( required=float(next_goal.required_score), name=next_goal.description, ) if next_goal is not None else None, last_contribution=last_contrib ) return payload class TimerChannel(Channel): name = 'SubTimer' def __init__(self, profiler: ProfilesRegistry, subathons: SubathonRegistry, **kwargs): super().__init__(**kwargs) self.profiler: ProfilesRegistry = profiler self.subathons: SubathonRegistry = subathons # Map of communities to webhooks listening for this community self.communities = defaultdict(set) # Map of communityid -> listening websockets async def on_connection(self, websocket, event): if not (cidstr := event.get('community')): raise ValueError("Subathon timer connection missing communityid") elif not cidstr.isdigit(): raise ValueError("Community id provided is not an integer") cid = int(cidstr) community = await self.profiler.get_community(cid) if community is None: raise ValueError("Unknown community provided.") await super().on_connection(websocket, event) self.communities[cid].add(websocket) subathon = await self.subathons.get_active_subathon(cid) if subathon: payload = await prepare_subathon(self.profiler, subathon) await self.send_subathon_update(cid, payload, websocket) else: await self.send_no_subathon(cid, websocket) async def del_connection(self, websocket): for wss in self.communities.values(): wss.discard(websocket) await super().del_connection(websocket) async def send_subathon_update(self, communityid: int, payload: SubathonPayload, websocket=None): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { 'type': "DO", 'method': "setTimer", 'args': payload, }, websocket=ws ) async def send_subathon_ended(self, communityid: int, payload: SubathonPayload, websocket=None): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { 'type': "DO", 'method': "endTimer", 'args': payload, }, websocket=ws ) async def send_no_subathon(self, communityid: int, websocket=None): for ws in (websocket,) if websocket else self.communities[communityid]: await self.send_event( { 'type': "DO", 'method': "noTimer", 'args': {}, }, websocket=ws )