Improve subathon modularity.

This commit is contained in:
2025-09-22 23:30:45 +10:00
parent 899f5e7292
commit 033fc5c1ae
3 changed files with 477 additions and 183 deletions

148
subathon/subathon.py Normal file
View File

@@ -0,0 +1,148 @@
from asyncio import Event
from datetime import datetime, timedelta
from utils.lib import utc_now, strfdelta
from modules.profiles.profiles.profiles import ProfilesRegistry
from . import logger
from .data import SubathonData, Subathon, RunningSubathon, SubathonContribution, SubathonGoal
class ActiveSubathon:
def __init__(self, subathondata: Subathon, runningdata: RunningSubathon | None):
self.subathondata = subathondata
self.runningdata = runningdata
@property
def running(self):
return self.runningdata is not None
@property
def name(self):
return self.subathondata.name or 'Subathon'
async def check_cap(self):
if not (cap := self.subathondata.timecap):
return False
else:
score = await self.get_score()
time_earned = self.get_score_time(score)
total_time = self.subathondata.initial_time + time_earned
return total_time >= cap
async def check_finished(self):
"""
Return True if the subathon duration has exceeded its earned time.
"""
return (await self.get_duration() <= 0)
async def pause(self):
"""
Pause the active subathon.
If the subathon is in 'overtime', i.e. it has already finished,
then the total time is saved as the earned time not including the overtime.
"""
if not self.running:
raise ValueError("This subathon is not running!")
assert self.runningdata is not None
new_duration = await self.get_duration()
await self.subathondata.update(duration=new_duration)
await self.runningdata.delete()
self.runningdata = None
async def resume(self):
if self.running:
raise ValueError("This subathon is already running!")
self.runningdata = await RunningSubathon.create(subathon_id=self.subathondata.subathon_id)
async def get_score(self) -> float:
rows = await SubathonContribution.fetch_where(subathon_id=self.subathondata.subathon_id)
return sum(row.score for row in rows)
def get_score_time(self, score: float) -> int:
# Get time contributed by this score
return int(score * self.subathondata.score_time)
async def get_duration(self) -> int:
"""
Number of seconds that this subathon has run for.
Includes current running session if it exists.
If the subathon is already finished (in overtime)
returns the earned time instead of the true duration.
"""
duration = self.subathondata.duration
# Add running duration if required
if self.runningdata:
now = utc_now()
added = int( (now - self.runningdata.last_started).total_seconds() )
duration += added
earned = await self.get_earned()
return min(duration, earned)
async def get_earned(self) -> int:
"""
Number of seconds earned in the subathon so far.
Includes initial time.
Takes into account the cap.
"""
score = await self.get_score()
time_earned = self.get_score_time(score)
total_time = self.subathondata.initial_time + time_earned
if cap := self.subathondata.timecap:
total_time = min(total_time, cap)
return total_time
async def get_remaining(self) -> int:
"""
Number of seconds remaining on the subathon timer.
Will be 0 if finished.
"""
total_time = await self.get_earned()
return total_time - await self.get_duration()
async def get_ending(self):
"""
Ending time of the subathon.
May be in the past if the subathon has already ended.
"""
now = utc_now()
remaining = await self.get_remaining()
return now + timedelta(seconds=remaining)
async def add_contribution(self, profileid: int | None, score: float, event_id: int | None) -> SubathonContribution:
return await SubathonContribution.create(
subathon_id=self.subathondata.subathon_id,
profileid=profileid, score=score, event_id=event_id
)
def fetch_contributions(self, **kwargs):
query = SubathonContribution.fetch_where(
subathon_id=self.subathondata.subathon_id,
**kwargs
).order_by('created_at')
return query
async def get_goals(self) -> list[SubathonGoal]:
goals = await SubathonGoal.fetch_where(subathon_id=self.subathondata.subathon_id).order_by('required_score')
return goals
class SubathonRegistry:
def __init__(self, data: SubathonData, profiler: ProfilesRegistry):
self.data = data
self.profiler = profiler
async def init(self):
await self.data.init()
async def get_active_subathon(self, communityid: int) -> ActiveSubathon | None:
rows = await Subathon.fetch_where(communityid=communityid, ended_at=None)
if rows:
subathondata = rows[0]
running = await RunningSubathon.fetch(subathondata.subathon_id)
subba = ActiveSubathon(subathondata, running)
return subba