Files
twitch-subathon-plugin/subathon/component.py
2025-09-01 19:21:07 +10:00

494 lines
19 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
import random
import twitchio
from twitchio import PartialUser, Scopes, eventsub
from twitchio.ext import commands as cmds
from datamodels import BotChannel, Communities, UserProfile
from meta import CrocBot
from utils.lib import utc_now, strfdelta
from sockets import Channel, register_channel
from . import logger
from .data import SubathonData, Subathon, RunningSubathon, SubathonContribution, SubathonGoal
class TimerChannel(Channel):
name = 'Timer'
def __init__(self, cog: 'SubathonComponent', **kwargs):
super().__init__(**kwargs)
self.cog = cog
self.communityid = 1
async def on_connection(self, websocket, event):
await super().on_connection(websocket, event)
await self.send_set(
**await self.get_args_for(self.communityid),
websocket=websocket,
)
async def send_updates(self):
await self.send_set(
**await self.get_args_for(self.communityid),
)
async def get_args_for(self, channelid):
active = await self.cog.get_active_subathon(channelid)
if active is not None:
ending = utc_now() + timedelta(seconds=await active.get_remaining())
return {
'end_at': ending,
'running': active.running
}
else:
return {
'end_at': utc_now,
'running': False,
}
async def send_set(self, end_at, running, websocket=None):
await self.send_event({
'type': "DO",
'method': 'setTimer',
'args': {
'end_at': end_at.isoformat(),
'running': running,
}
}, websocket=websocket)
class ActiveSubathon:
def __init__(self, subathondata: Subathon, runningdata: RunningSubathon | None):
self.subathondata = subathondata
self.runningdata = runningdata
@property
def running(self):
return self.runningdata is not None
async def check_cap(self):
if not (cap := self.subathondata.timecap):
return False
else:
score = await self.get_score()
time_earned = self.get_score_time(score)
total_time = self.subathondata.initial_time + time_earned
return total_time >= cap
async def pause(self):
if not self.running:
raise ValueError("This subathon is not running!")
assert self.runningdata is not None
new_duration = self.get_duration()
await self.subathondata.update(duration=new_duration)
await self.runningdata.delete()
async def resume(self):
if self.running:
raise ValueError("This subathon is already running!")
self.runningdata = await RunningSubathon.create(subathon_id=self.subathondata.subathon_id)
async def get_score(self) -> float:
rows = await SubathonContribution.fetch_where(subathon_id=self.subathondata.subathon_id)
return sum(row.score for row in rows)
def get_score_time(self, score: float) -> int:
# Get time contributed by this score
return int(score * self.subathondata.score_time)
def get_duration(self) -> int:
# Get the duration of this subathon so far
duration = self.subathondata.duration
if self.runningdata:
now = utc_now()
added = int( (now - self.runningdata.last_started).total_seconds() )
duration += added
return duration
async def get_remaining(self) -> int:
# Get the remaining time
score = await self.get_score()
time_earned = self.get_score_time(score)
total_time = self.subathondata.initial_time + time_earned
if cap := self.subathondata.timecap:
total_time = min(total_time, cap)
return total_time - self.get_duration()
async def add_contribution(self, profileid: int | None, score: float, event_id: int | None) -> SubathonContribution:
return await SubathonContribution.create(
subathon_id=self.subathondata.subathon_id,
profileid=profileid, score=score, event_id=event_id
)
async def get_goals(self) -> list[SubathonGoal]:
goals = await SubathonGoal.fetch_where(subathon_id=self.subathondata.subathon_id).order_by('required_score')
return goals
class SubathonComponent(cmds.Component):
def __init__(self, bot: CrocBot):
self.bot = bot
self.data = bot.dbconn.load_registry(SubathonData())
self.channel = TimerChannel(self)
register_channel('SubTimer', self.channel)
# ----- API -----
async def component_load(self):
# TODO: Setup the websocket
await self.data.init()
async def component_teardown(self):
pass
# ----- Methods -----
async def get_community(self, twitchid: str, name: str | None) -> Communities:
return await self.bot.community_fetch(twitchid=twitchid, name=name)
async def get_profile(self, twitchid: str, name: str | None) -> UserProfile:
return await self.bot.profile_fetch(twitchid=twitchid, name=name)
async def get_active_subathon(self, communityid: int) -> ActiveSubathon | None:
rows = await Subathon.fetch_where(communityid=communityid, ended_at=None)
if rows:
subathondata = rows[0]
running = await RunningSubathon.fetch(subathondata.subathon_id)
subba = ActiveSubathon(subathondata, running)
return subba
async def goalcheck(self, active: ActiveSubathon, channel: PartialUser):
goals = await active.get_goals()
score = await active.get_score()
for i, goal in enumerate(goals):
if not goal.notified and goal.required_score <= score:
# Goal completed, notify channel
await channel.send_message(
f"We have reached Goal #{i+1}: {goal.description} !! Thank you everyone for your support <3",
sender=self.bot.bot_id,
)
await goal.update(notified=True)
# ----- Event Handlers -----
@cmds.Component.listener()
async def event_safe_bits_use(self, payload):
event_row, detail_row, bits_payload = payload
if (active := await self.get_active_subathon(event_row['communityid'])) is not None:
# In an active subathon
pid = event_row['profileid']
uid = event_row['user_id']
score = detail_row['bits'] * active.subathondata.bit_score
await active.add_contribution(pid, score, event_row['event_id'])
# Send message to channel
sec = active.get_score_time(score)
added_min = int(sec // 60)
if added_min > 0:
added = f"{added_min} minutes"
else:
added = f"{sec} seconds"
name = bits_payload.user.name
pl = 's' if bits_payload.bits != 1 else ''
contrib_str = f"{name} contributed {score} bit{pl}"
if not await active.check_cap():
contrib_str += f" and added {added} to the timer! Thank you holono1Heart"
else:
contrib_str += " towards our studython! Thank you holono1Heart"
await bits_payload.broadcaster.send_message(
contrib_str,
sender=self.bot.bot_id
)
await self.channel.send_updates()
await self.goalcheck(active, bits_payload.broadcaster)
# Check goals
@cmds.Component.listener()
async def event_safe_subscription(self, payload):
event_row, detail_row, sub_payload = payload
if sub_payload.gift:
# Ignore gifted here
return
if (active := await self.get_active_subathon(event_row['communityid'])) is not None:
data = active.subathondata
# In an active subathon
pid = event_row['profileid']
tier = int(sub_payload.tier)
if tier == 1000:
mult = data.sub1_score
elif tier == 2000:
mult = data.sub2_score
elif tier == 3000:
mult = data.sub3_score
else:
raise ValueError(f"Unknown sub tier {sub_payload.tier}")
score = mult * 1
await active.add_contribution(pid, score, event_row['event_id'])
# Send message to channel
added_min = int(active.get_score_time(score) // 60)
added = f"{added_min} minutes"
name = sub_payload.user.name
pl = 's' if score > 1 else ''
contrib_str = f"{name} contributed {score} sub{pl}"
if not await active.check_cap():
contrib_str += f" and added {added} to the timer! Thank you holono1Heart"
else:
contrib_str += " towards our studython! Thank you holono1Heart"
await sub_payload.broadcaster.send_message(
contrib_str,
sender=self.bot.bot_id
)
await self.channel.send_updates()
# Check goals
await self.goalcheck(active, sub_payload.broadcaster)
@cmds.Component.listener()
async def event_safe_subscription_gift(self, payload):
event_row, detail_row, gift_payload = payload
if (active := await self.get_active_subathon(event_row['communityid'])) is not None:
data = active.subathondata
# In an active subathon
pid = event_row['profileid']
tier = int(gift_payload.tier)
if tier == 1000:
mult = data.sub1_score
elif tier == 2000:
mult = data.sub2_score
elif tier == 3000:
mult = data.sub3_score
else:
raise ValueError(f"Unknown sub tier {gift_payload.tier}")
score = mult * gift_payload.total
await active.add_contribution(pid, score, event_row['event_id'])
# Send message to channel
added_min = int(active.get_score_time(score) // 60)
added = f"{added_min} minutes"
name = gift_payload.user.name if gift_payload.user else 'Anonymous'
contrib_str = f"{name} contributed {score} subs"
if not await active.check_cap():
contrib_str += f" and added {added} to the timer! Thank you holono1Heart"
else:
contrib_str += " towards our studython! Thank you holono1Heart"
await gift_payload.broadcaster.send_message(
contrib_str,
sender=self.bot.bot_id
)
await self.channel.send_updates()
# Check goals
await self.goalcheck(active, gift_payload.broadcaster)
@cmds.Component.listener()
async def event_safe_subscription_message(self, payload):
event_row, detail_row, sub_payload = payload
if (active := await self.get_active_subathon(event_row['communityid'])) is not None:
data = active.subathondata
# In an active subathon
pid = event_row['profileid']
tier = int(sub_payload.tier)
if tier == 1000:
mult = data.sub1_score
elif tier == 2000:
mult = data.sub2_score
elif tier == 3000:
mult = data.sub3_score
else:
raise ValueError(f"Unknown sub tier {sub_payload.tier}")
score = mult * 1
await active.add_contribution(pid, score, event_row['event_id'])
# Send message to channel
added_min = int(active.get_score_time(score) // 60)
added = f"{added_min} minutes"
name = sub_payload.user.name
pl = 's' if score > 1 else ''
contrib_str = f"{name} contributed {score} sub{pl}"
if not await active.check_cap():
contrib_str += f" and added {added} to the timer! Thank you holono1Heart"
else:
contrib_str += " towards our studython! Thank you holono1Heart"
await sub_payload.broadcaster.send_message(
contrib_str,
sender=self.bot.bot_id
)
await self.channel.send_updates()
# Check goals
await self.goalcheck(active, sub_payload.broadcaster)
# end stream => Automatically pause the timer
@cmds.Component.listener()
async def event_stream_offline(self, payload: twitchio.StreamOffline):
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
if active.running:
await active.pause()
await payload.broadcaster.send_message(
"Paused the subathon timer because the stream went offline!",
sender=self.bot.bot_id
)
await self.channel.send_updates()
# ----- Commands -----
@cmds.group(name='subathon', aliases=['studython'], invoke_fallback=True)
async def group_subathon(self, ctx: cmds.Context):
# TODO: Status
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
score = await active.get_score()
goals = await active.get_goals()
total_goals = len(goals)
donegoals = len([goal for goal in goals if score >= goal.required_score])
goalstr = f"{donegoals}/{total_goals} goals achieved"
secs = await active.get_remaining()
remaining = strfdelta(timedelta(seconds=secs))
secs = active.get_duration()
duration = strfdelta(timedelta(seconds=secs))
text = (
f"Subathon running for {duration}! {score} (equivalent) subscriptions recieved, {goalstr}, and {remaining} left on the timer"
)
await ctx.reply(text)
else:
await ctx.reply("No active subathon running!")
# subathon start
@group_subathon.command(name='setup')
async def cmd_setup(self, ctx: cmds.Context, initial_hours: float, sub1: float, sub2: float, sub3: float, bit: float, timescore: int, timecap: Optional[int]=None):
if ctx.broadcaster:
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
await ctx.reply("There is already an active subathon running! Use !subathon stop to stop it!")
return
initial_time = initial_hours * 60 * 60
active = await Subathon.create(
communityid=cid,
initial_time=initial_time,
sub1_score=sub1,
sub2_score=sub2,
sub3_score=sub3,
bit_score=bit,
score_time=timescore,
timecap=timecap
)
await ctx.reply("Setup a new subathon! Use !subathon resume to get the timer running.")
await self.channel.send_updates()
# subathon stop
@group_subathon.command(name='stop')
async def cmd_stop(self, ctx: cmds.Context):
if ctx.broadcaster:
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
if active.running:
await active.pause()
await self.channel.send_updates()
await active.subathondata.update(ended_at=utc_now())
total = await active.get_score()
dursecs = active.get_duration()
dur = strfdelta(timedelta(seconds=dursecs))
await ctx.reply(
f"Subathon complete after {dur} with a total of {total} subs, congratulations!"
)
else:
await ctx.reply("No active subathon to stop.")
# subathon pause
@group_subathon.command(name='pause')
async def cmd_pause(self, ctx: cmds.Context):
if ctx.broadcaster or ctx.author.moderator:
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
if active.running:
await active.pause()
await ctx.reply("Subathon timer paused!")
await self.channel.send_updates()
else:
await ctx.reply("Subathon timer already paused!")
else:
await ctx.reply("No active subathon to pause")
# subathon resume
@group_subathon.command(name='resume')
async def cmd_resume(self, ctx: cmds.Context):
if ctx.broadcaster or ctx.author.moderator:
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
if not active.running:
await active.resume()
await ctx.reply("Subathon timer resumed!")
await self.channel.send_updates()
else:
await ctx.reply("Subathon timer already running!")
else:
await ctx.reply("No active subathon to resume")
@cmds.group(name='goals', invoke_fallback=True)
async def group_goals(self, ctx: cmds.Context):
# List the goals
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
goals = await active.get_goals()
goalstrs = []
for i, goal in enumerate(goals, start=1):
line = f"{goal.required_score} subs: {goal.description}"
goalstrs.append(line)
if goals:
text = ', '.join(goalstrs)
await ctx.reply(f"Subathon Goals! -- {text}")
else:
await ctx.reply("No goals have been configured!")
else:
await ctx.reply("No active subathon running!")
@group_goals.command(name='add')
async def cmd_add(self, ctx: cmds.Context, required: int, *, description: str):
if ctx.broadcaster or ctx.author.moderator:
community = await self.bot.community_fetch(twitchid=ctx.broadcaster.id, name=ctx.broadcaster.name)
cid = community.communityid
if (active := await self.get_active_subathon(cid)) is not None:
await SubathonGoal.create(
subathon_id=active.subathondata.subathon_id,
required_score=required,
description=description,
)
await ctx.reply("Goal added!")
else:
await ctx.reply("No active subathon to goal!")