Files
twitch-subathon-plugin/subathon/channel.py

262 lines
7.8 KiB
Python

from typing import Optional, TypeAlias, TypedDict
import json
from collections import defaultdict
from datetime import datetime, timedelta
from dataclasses import dataclass
from data.queries import JOINTYPE, ORDER
from meta.sockets import Channel
from utils.lib import utc_now
from modules.profiles.profiles.profiles import ProfilesRegistry
from . import logger
from .data import (
SubathonData,
Subathon,
RunningSubathon,
SubathonContribution,
SubathonGoal,
)
from .subathon import ActiveSubathon, SubathonRegistry
# ISO formatted timestamp
ISOTimestamp: TypeAlias = str
class GoalPayload(TypedDict):
required: float
name: str
class ContributionPayload(TypedDict):
user_name: str
user_id: str | None
amount: float
seconds_added: int
timestamp: ISOTimestamp
class LeaderboardItemPayload(TypedDict):
user_name: str
user_id: str | None
amount: float
class ScoreTablePayload(TypedDict):
bit_score: float # points per bit
t1_score: float # points per T1 sub
t2_score: float # points per T2 sub
t3_score: float # points per T3 sub
score_time: float # seconds added per point
class SubathonPayload(TypedDict):
end_at: ISOTimestamp
is_running: bool
score_table: ScoreTablePayload
name: str
total_contribution: float
total_subpoints: int
total_bits: int
goals_met: int
goals_total: int
last_goal: GoalPayload | None
next_goal: GoalPayload | None
goals: list[GoalPayload]
last_contribution: ContributionPayload | None
leaderboard: list[LeaderboardItemPayload]
async def prepare_subathon(
profiler: ProfilesRegistry, subathon: ActiveSubathon
) -> SubathonPayload:
total_score = await subathon.get_score()
# Build the goal data
goals = await subathon.get_goals()
goals_met = 0
last_goalp = None
next_goalp = None
goalps = []
total_goals = len(goals)
for goal in goals:
goalp = GoalPayload(
required=float(goal.required_score),
name=goal.description,
)
goalps.append(goalp)
if goal.required_score <= total_score:
last_goalp = goalp
goals_met += 1
elif next_goalp is None:
next_goalp = goalp
# Build the last contribution information
contribs = await subathon.fetch_contributions().limit(1)
last_contrib: ContributionPayload | None = None
if contribs:
contrib = contribs[0]
if contrib.profileid:
profile = await profiler.get_profile(contrib.profileid)
assert profile is not None
name = profile.nickname
user_id = None
else:
name = "Anonymous"
user_id = None
last_contrib = ContributionPayload(
user_name=name,
user_id=user_id,
amount=float(contrib.score),
seconds_added=subathon.get_score_time(contrib.score),
timestamp=contrib.created_at.isoformat(),
)
# Build the score table
score_table = ScoreTablePayload(
bit_score=float(subathon.subathondata.bit_score),
t1_score=float(subathon.subathondata.sub1_score),
t2_score=float(subathon.subathondata.sub2_score),
t3_score=float(subathon.subathondata.sub3_score),
score_time=int(subathon.subathondata.score_time),
)
# Build the contribution leaderboard
query = SubathonContribution.table.select_where(
subathon_id=subathon.subathondata.subathon_id
)
query.join("user_profiles", using=("profileid",), join_type=JOINTYPE.LEFT)
query.select("subathon_id", "profileid", total="SUM(score)")
query.order_by("total", direction=ORDER.DESC)
query.group_by("subathon_id", "profileid")
query.with_no_adapter()
results = await query
leaderboard = []
for row in results:
if pid := row["profileid"]:
profile = await profiler.get_profile(pid)
name = (profile.nickname if profile else None) or "Unknown"
else:
name = "Anonymous"
score = row["total"]
item = LeaderboardItemPayload(
user_name=name,
user_id=pid,
amount=float(score),
)
leaderboard.append(item)
# Build the raw totals
subpoints = await subathon.get_total_subs()
bits = await subathon.get_total_bits()
# Finally, put together the payload
payload = SubathonPayload(
name=subathon.subathondata.name,
end_at=(await subathon.get_ending()).isoformat(),
is_running=(subathon.running),
score_table=score_table,
total_contribution=float(await subathon.get_score()),
total_subpoints=subpoints,
total_bits=bits,
goals_met=goals_met,
goals_total=total_goals,
last_goal=last_goalp,
next_goal=next_goalp,
goals=goalps,
last_contribution=last_contrib,
leaderboard=leaderboard,
)
return payload
class TimerChannel(Channel):
name = "SubTimer"
def __init__(
self, profiler: ProfilesRegistry, subathons: SubathonRegistry, **kwargs
):
super().__init__(**kwargs)
self.profiler: ProfilesRegistry = profiler
self.subathons: SubathonRegistry = subathons
# Map of communities to webhooks listening for this community
self.communities = defaultdict(
set
) # Map of communityid -> listening websockets
async def on_connection(self, websocket, event):
if not (cidstr := event.get("community")):
raise ValueError("Subathon timer connection missing communityid")
elif not cidstr.isdigit():
raise ValueError("Community id provided is not an integer")
cid = int(cidstr)
community = await self.profiler.get_community(cid)
if community is None:
raise ValueError("Unknown community provided.")
await super().on_connection(websocket, event)
self.communities[cid].add(websocket)
subathon = await self.subathons.get_active_subathon(cid)
if subathon:
payload = await prepare_subathon(self.profiler, subathon)
await self.send_subathon_update(cid, payload, websocket)
else:
await self.send_no_subathon(cid, websocket)
async def del_connection(self, websocket):
for wss in self.communities.values():
wss.discard(websocket)
await super().del_connection(websocket)
async def send_subathon_update(
self, communityid: int, payload: SubathonPayload, websocket=None
):
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{
"type": "DO",
"method": "setTimer",
"args": payload,
},
websocket=ws,
)
async def send_subathon_ended(
self, communityid: int, payload: SubathonPayload, websocket=None
):
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{
"type": "DO",
"method": "endTimer",
"args": payload,
},
websocket=ws,
)
async def send_no_subathon(self, communityid: int, websocket=None):
for ws in (websocket,) if websocket else self.communities[communityid]:
await self.send_event(
{
"type": "DO",
"method": "noTimer",
"args": {},
},
websocket=ws,
)
async def send_event(self, event, **kwargs):
logger.info(f"Sending websocket event: {json.dumps(event, indent=1)}")
await super().send_event(event, **kwargs)