feat(twitch): Single-task feature parity

This commit is contained in:
2025-08-10 21:40:31 +10:00
parent b57787589e
commit daf72be82e
5 changed files with 604 additions and 0 deletions

72
data.py
View File

@@ -0,0 +1,72 @@
import datetime as dt
from data import Registry, RowModel, Table
from data.columns import String, Timestamp, Integer, Bool
class Task(RowModel):
"""
Schema
------
"""
_tablename_ = 'tasklist'
_cache_ = {}
taskid = Integer(primary=True)
profileid = Integer()
content = String()
created_at = Timestamp()
deleted_at = Timestamp()
duration = Integer()
started_at = Timestamp()
completed_at = Timestamp()
_timestamp = Timestamp()
class TaskProfile(RowModel):
"""
Schema
------
"""
_tablename_ = 'task_profiles'
_cache_ = {}
profileid = Integer(primary=True)
class TaskInfo(RowModel):
_tablename_ = 'tasklist_info'
taskid = Integer(primary=True)
profileid = Integer()
content = String()
created_at = Timestamp()
duration = Integer()
started_at = Timestamp()
completed_at = Timestamp()
last_started = Timestamp()
nowid = Integer()
order_idx = Integer()
is_running = Bool()
is_planned = Bool()
is_complete = Bool()
tasklabel = Integer()
@property
def total_duration(self):
dur = self.duration
if self.is_running and self.last_started:
dur += (dt.datetime.now(tz=dt.UTC) - self.last_started).total_seconds()
return int(dur)
class TaskData(Registry):
tasklist = Task.table
task_profiles = TaskProfile.table
tasklist_info = TaskInfo.table
nowlist = Table('nowlist')
taskplan = Table('taskplan')

View File

@@ -96,6 +96,8 @@ Okay, I want all these things, but I think a lot of these are a step too far rig
We definitely want Tips as well, for a kind of docs, flow guidlines.
# Data structure

View File

256
tasklist.py Normal file
View File

@@ -0,0 +1,256 @@
import re
from typing import Optional
from data import Database
from utils.lib import utc_now
from .data import Task, TaskInfo, TaskProfile, TaskData
class TasklistParseCreateError(Exception):
...
class Tasklist:
"""
Represents the tasklist for a single user.
"""
profileid: int
data: TaskData
id_tasks: dict[int, TaskInfo]
label_ids: dict[int, int]
current: Optional[int]
plan: list[int]
taskspec_re = re.compile(
r"^(?P<start>\d+)((\s*(?P<range>-)\s*)(?P<end>\d*))?$"
)
def __init__(self, profileid: int, tasks: list[TaskInfo], data: TaskData, update_callback=None):
self.profileid = profileid
self.data = data
self._set_tasks(tasks)
self._callback = update_callback
async def refresh(self):
tasks = await TaskInfo.fetch_where(profileid=self.profileid)
self._set_tasks(tasks)
def _set_tasks(self, tasks: list[TaskInfo]):
self.id_tasks = {task.taskid: task for task in tasks}
self.label_ids = {task.tasklabel: task.taskid for task in tasks}
self.current = next((task.taskid for task in tasks if task.is_running), None)
plan = [task for task in tasks if task.is_planned]
plan.sort(key=lambda task: task.order_idx)
self.plan = [task.taskid for task in plan]
async def on_update(self):
await self.refresh()
if self._callback is not None:
await self._callback(self)
async def create_tasks(self, *contents, **kwargs) -> list[TaskInfo]:
if not contents:
return []
rows = await self.data.tasklist.insert_many(
('profileid', 'content', *kwargs.keys()),
*((self.profileid, content, *kwargs.values()) for content in contents)
)
taskids = [row['taskid'] for row in rows]
info = await TaskInfo.fetch_where(taskid=taskids)
await self.on_update()
# self.id_tasks.update({task.taskid: task for task in info})
# self.label_ids.update({task.tasklabel: task.taskid for task in info})
return info
async def edit_task(self, taskid, new_content):
await self.data.tasklist.update_where(taskid=taskid).set(content=new_content)
await self.on_update()
async def delete_tasks(self, *taskids):
"""
Mark the given taskids as deleted.
"""
if taskids:
await self.data.tasklist.update_where(taskid=taskids).set(deleted_at=utc_now())
await self.on_update()
def get_plan(self) -> list[TaskInfo]:
return [self.id_tasks[id] for id in self.plan]
async def push_plan_head(self, *taskids: int):
plan = self.get_plan()
min_idx = min([task.order_idx for task in plan], default=0)
shift = len(taskids)
plan_data = zip(taskids, range(min_idx - shift, min_idx))
await self.data.taskplan.delete_where(taskid=taskids)
await self.data.taskplan.insert_many(
('taskid', 'order_idx'),
*plan_data
)
# Refreshing here instead of modifying cache
# Because the planned flag will be wrong on the saved taskinfo as well
await self.on_update()
async def push_plan_tail(self, *taskids: int):
plan = self.get_plan()
max_idx = max([task.order_idx for task in plan], default=0)
shift = len(taskids)
plan_data = zip(taskids, range(max_idx + 1, max_idx + shift + 1))
await self.data.taskplan.delete_where(taskid=taskids)
await self.data.taskplan.insert_many(
('taskid', 'order_idx'),
*plan_data
)
await self.on_update()
async def set_plan(self, *taskids: int):
# TODO: Should really be in a transaction
await self.data.taskplan.delete_where(profileid=self.profileid)
if taskids:
plan_data = zip(taskids, range(len(taskids)))
await self.data.taskplan.insert_many(('taskid', 'order_idx'), *plan_data)
await self.on_update()
def get_current(self) -> Optional[TaskInfo]:
return self.id_tasks[self.current] if self.current is not None else None
async def set_now(self, taskid: int):
# Unset current task if it exists
await self.unset_now()
task = self.id_tasks[taskid]
await self.data.nowlist.insert(taskid=taskid, last_started=None if task.is_complete else utc_now())
await self.on_update()
async def unset_now(self):
# Unset the current task and update the duration correctly
# Does not put the current task on the plan
current = self.get_current()
if current is not None:
now = utc_now()
assert current.is_running or (current.last_started is None)
if current.is_running:
duration = (now - current.last_started).total_seconds()
duration += current.duration
await self.data.tasklist.update_where(taskid=current.taskid).set(duration=duration)
await self.data.nowlist.delete_where(taskid=current.taskid)
await self.on_update()
async def complete_tasks(self, *taskids) -> list[TaskInfo]:
# Remove any tasks which are already complete
# TODO: Transaction
taskids = [id for id in taskids if not self.id_tasks[id].is_complete]
if taskids:
now = utc_now()
await self.data.tasklist.update_where(taskid=taskids).set(completed_at=now)
if self.current in taskids:
current = self.get_current()
assert current is not None
assert current.last_started is not None
duration = (utc_now() - current.last_started).total_seconds() + current.duration
await self.data.tasklist.update_where(taskid=self.current).set(duration=duration)
await self.data.nowlist.update_where(taskid=self.current).set(last_started=None)
await self.on_update()
# Return tasks which were actually completed
return [self.id_tasks[taskid] for taskid in taskids]
async def parse_taskspec(self, taskspec: str, multiple=True, create=True) -> list[TaskInfo]:
"""
Parse a user provide taskspec string.
TasklistParseError
TasklistParseNoCreateError
taskspec is comma (not escaped) or semicolon tasks. Can't mix these, and prefer semicolon.
No way of writing an actual semicolon, but that's too bad.
Tasklabels (numerical indexes) may also be specified in ranges.
NOTE: The tasklabels will include completed tasks, even if they aren't shown.
This is inevitable, but important to note.
"""
# First split the userstring
if '\n' in taskspec:
splits = taskspec.split('\n')
elif ';' in taskspec:
splits = taskspec.split(';')
else:
splits = taskspec.split(',')
splits = [split.strip(',; \n') for split in splits]
splits = [split for split in splits if split]
taskids: list[Optional[int]] = []
seen = set()
to_create: list[tuple[int, str]] = []
parsed: list[Optional[int]] = []
# Get max label for use on unterminated ends
maxlabel = max(self.label_ids.keys(), default=None)
i = 0 # Insertion index for parsed, used in to_create
for split in splits:
match = self.taskspec_re.match(split)
if match:
# Task looks like a range or numeric
start = int(match['start'])
ranged = match['range']
end = int(match['end'] or maxlabel or -1)
if ranged:
labels = list(range(start, end + 1))
else:
labels = [start]
for label in labels:
if label in self.label_ids:
taskid = self.label_ids[label]
else:
raise ValueError(f"Unknown task label {label}")
if taskid not in seen:
i += 1
taskids.append(taskid)
seen.add(taskid)
else:
# Presume it is a task we need to create
to_create.append((i, split))
taskids.append(None)
i += 1
if not create:
raise TasklistParseCreateError()
for i, content in to_create:
task, = await self.create_tasks(content)
taskids[i] = task.taskid
assert all(taskid is not None for taskid in taskids)
return [self.id_tasks[taskid] for taskid in taskids]
class TaskRegistry:
def __init__(self, data: TaskData, update_callback=None):
# TODO: Also need to pass some way of fetching the UserProfile
# Possibly community as well, in general
# i.e. pass in the ProfileRegistry
self.data = data
self._callback = update_callback
async def setup(self):
await self.data.init()
async def get_task_profile(self, profileid: int) -> TaskProfile:
# TODO: If we add anything to the TaskProfile, we probably want to make this smarter
return await TaskProfile.fetch_or_create(profileid)
async def get_current_task(self, profileid: int) -> Optional[TaskInfo]:
info = await TaskInfo.fetch_where(profileid=profileid, is_running=True)
return info[0] if info else None
async def get_tasklist(self, profileid: int):
tasks = await TaskInfo.fetch_where(profileid=profileid)
tasklist = Tasklist(profileid, tasks, self.data, update_callback=self._callback)
return tasklist
async def get_nowlist(self) -> list[TaskInfo]:
return await TaskInfo.fetch_where(is_running=True)

274
twitch/component.py Normal file
View File

@@ -0,0 +1,274 @@
from datetime import timedelta
from typing import Optional
import twitchio
from twitchio.ext import commands as cmds
from meta import Bot
from utils.lib import strfdelta, utc_now
from ..data import TaskInfo, TaskProfile, Task, TaskData
from ..tasklist import TaskRegistry
from .. import logger
# TaskView and TasklistView?
# TaskView
class TaskComponent(cmds.Component):
"""
Command Interface
-----------------
!now 1, task, 2-5
!edit
!clear
Deletes the currently set 'now' task.
!done
!plan
!plan a, b, c
!unplan
!clearplan
!notnow
!resume
!soon
!tasklist
!add
!delete
!now a, b, c
!now x
!next
TODO: Probably a TaskView class for consistently formatting the task contents?
Maybe with a DiscordTaskView and TwitchTaskView
TODO: Moderator command clearfor? Not sure how moderators should manage the tasklist.
"""
def __init__(self, bot: Bot):
self.bot = bot
self.data = bot.dbconn.load_registry(TaskData())
self.tasker = TaskRegistry(self.data)
async def component_load(self):
await self.data.init()
await self.tasker.setup()
# TODO: Add the IPC task update callback
async def get_profile_for(self, user):
...
@cmds.command(name='now', aliases=['task', 'check', 'add'])
async def cmd_now(self, ctx: cmds.Context, *, args: Optional[str]):
"""
Examples:
{prefix}now
{prefix}now current task
{prefix}now current task; next task; task after that
"""
# TODO: Breaking change: check and add should change behaviour.
# Check shows the status of the given task (does not allow creation)
# Add adds the task(s) to the end of the plan.
# TODOTODO: Support newline breaking on multi-tasks!! Although prefer semicolons
profileid = (await self.get_profile_for(ctx.author)).profileid
# Get tasklist for this profile
tasklist = await self.tasker.get_tasklist(profileid)
current = tasklist.get_current()
# If we are given a new task, delete the current now, set the new now
if args:
if current:
await tasklist.delete_tasks(current.taskid)
task, = await tasklist.create_tasks(args)
await tasklist.set_now(task.taskid)
# TODO: Maybe I should send the update now instead, after three separate changes..
await ctx.reply("Updated your current task, good luck!")
elif current:
if current.is_complete:
done_ago = strfdelta(utc_now() - current.completed_at)
await ctx.reply(f"You finished '{current.content}' {done_ago} ago!")
else:
started_ago = strfdelta(timedelta(seconds=current.total_duration))
await ctx.reply(f"You have been working on '{current.content}' for {started_ago}!")
else:
await ctx.reply(
"You don't have a task on the tasklist! "
"Show what you are currently working on with, e.g., !now Reading notes"
)
@cmds.command(name='edit')
async def cmd_edit(self, ctx: cmds.Context, *, args: Optional[str]):
"""
Usage:
{prefix}edit [taskid] <new content>
Examples:
{prefix}edit new current task
{prefix}edit 2 New task content for task 2
"""
# TODO: Breaking changes for multi-tasks
# edit will support a numberic argument
if not args:
# TODO: Better USAGE fetching
await ctx.reply(f"Usage: {ctx.prefix}edit <new content>. E.g. {ctx.prefix}edit Now Reading")
return
profile = await self.get_profile_for(ctx.author)
tasklist = await self.tasker.get_tasklist(profile.profileid)
current = tasklist.get_current()
if current:
await tasklist.edit_task(current.taskid, args)
await ctx.reply("Updated your current task!")
else:
await ctx.reply("No current task to edit! ")
@cmds.command(name='delete', aliases=['clear', 'remove'])
async def cmd_delete(self, ctx: cmds.Context, *, taskspec: Optional[str] = None):
"""
Description:
Deletes the specified tasks or the current task.
Examples:
!delete
!delete 1-
"""
# TODO: Breaking changes for multi-tasks
# delete will support an argument
profile = await self.get_profile_for(ctx.author)
tasklist = await self.tasker.get_tasklist(profile.profileid)
current = tasklist.get_current()
if current:
await tasklist.delete_tasks(current.taskid)
await ctx.reply("Deleted your current task from the tasklist!")
else:
await ctx.reply("You don't have a current task set at the moment!")
@cmds.command(name='done')
async def cmd_done(self, ctx: cmds.Context, *, taskspec: Optional[str] = None):
"""
Description:
Marks the specified tasks as complete, or the current task.
Examples:
!done
!done 1-, 3-5, 6
"""
# TODO: We can actually create the task here if it's not done.
profile = await self.get_profile_for(ctx.author)
tasklist = await self.tasker.get_tasklist(profile.profileid)
current = tasklist.get_current()
if taskspec:
tasks = await tasklist.parse_taskspec(taskspec)
else:
tasks = [current] if current else []
if tasks:
# Complete the tasks
completed = await tasklist.complete_tasks(task.taskid for task in tasks)
# Response depends on how many tasks were complete
# Don't show if duration is less than 30 seconds
if not completed:
# No tasks were actually completed
if len(tasks) > 1:
await ctx.reply("You already finished these tasks!")
else:
task = tasks[0]
await ctx.reply(f"You already finished '{task.content}'")
else:
# Note that duration for completed tasks will always be correct
duration = int(sum(task.duration for task in completed))
durstr = strfdelta(timedelta(seconds=duration))
if len(completed) == 1:
task = completed[0]
taskstr = f"Good work finishing '{task.content}'"
if duration > 60:
taskstr += f" You worked on it for {durstr}"
else:
taskstr = f"{len(completed)} more tasks completed, great work!"
if duration > 60:
taskstr += f" You worked on them for {durstr}"
await ctx.reply(taskstr)
elif taskspec:
await ctx.reply(f"'{taskspec}' didn't match any tasks!")
else:
await ctx.reply(
"You don't have a task on the tasklist! "
f"Show what you are currently working on with, e.g., {ctx.prefix}now Reading Notes"
)
@cmds.command(name='next')
async def cmd_next(self, ctx: cmds.Context, *, taskspec: Optional[str] = None):
"""
Description:
Marks the current task as complete, if it exists,
and moves to the next task on your plan.
If an argument is given,
acts as first a !done and then !now with the same argument
Examples:
!next
"""
if not taskspec:
await ctx.reply(
f"Usage:{ctx.prefix}next <next task> "
f"TIP: {ctx.prefix}next completes your current task and sets the given task as your new current task."
)
return
# Very similar to !now, but with a !done first, and different arguments
profile = await self.get_profile_for(ctx.author)
tasklist = await self.tasker.get_tasklist(profile.profileid)
current = tasklist.get_current()
# Complete the current task if it exists
if current and not current.is_complete:
current, = await tasklist.complete_tasks(current.taskid)
new_current, = await tasklist.create_tasks(taskspec)
await tasklist.set_now(new_current.taskid)
if current:
started_ago = strfdelta(timedelta(seconds=current.total_duration))
await ctx.reply(
"Completed your current task and started your next one! Good luck! "
f"You worked on '{current.content}' for {started_ago}"
)
else:
await ctx.reply(
"Started your next task, good luck!"
)
@cmds.command(name='plan')
async def cmd_plan(self, ctx: cmds.Context):
"""
Description:
View or set a plan of tasks to do from your tasklist.
The plan lets you work with just your immediate next tasks,
and writing !next with no arguments will switch you to your next task in the plan.
If given arguments, this command adds the specified tasks
to the end of the plan.
See also !clearplan, !replan, !soon, !later/add
Examples:
...
"""
await ctx.reply("Planning is not implemented yet, coming soon!")
...
@cmds.command(name='unplan', aliases=('clearplan',))
async def cmd_unplan(self, ctx: cmds.Context):
"""
Description:
Clears your plan, or moves the
"""
await ctx.reply("Planning is not implemented yet, coming soon!")
...