rewrite: Initial statistics UI.

This commit is contained in:
2023-03-02 19:01:14 +02:00
parent 8f127af9d0
commit 7dc361b1b9
19 changed files with 2283 additions and 0 deletions

View File

@@ -7,6 +7,7 @@ active = [
'.reminders',
'.shop',
'.tasklist',
'.statistics',
'.test',
]

View File

@@ -0,0 +1,10 @@
from babel.translator import LocalBabel
from meta.LionBot import LionBot
babel = LocalBabel('statistics')
async def setup(bot: LionBot):
from .cog import StatsCog
await bot.add_cog(StatsCog(bot))

View File

@@ -0,0 +1,67 @@
import logging
from typing import Optional
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
from discord.ui.button import ButtonStyle
from meta import LionBot, LionCog, LionContext
from utils.lib import error_embed
from utils.ui import LeoUI, AButton
from . import babel
from .data import StatsData
from .ui import ProfileUI, WeeklyMonthlyUI
_p = babel._p
logger = logging.getLogger(__name__)
class StatsCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.data = bot.db.load_registry(StatsData())
async def cog_load(self):
await self.data.init()
@cmds.hybrid_command(
name=_p('cmd:me', "me"),
description=_p(
'cmd:me|desc',
"Display your personal profile and summary statistics."
)
)
async def me_cmd(self, ctx: LionContext):
await ctx.interaction.response.defer(thinking=True)
ui = ProfileUI(self.bot, ctx.author, ctx.guild)
await ui.run(ctx.interaction)
@cmds.hybrid_command(
name=_p('cmd:stats', "stats"),
description=_p(
'cmd:stats|desc',
"Weekly and monthly statistics for your recent activity."
)
)
async def stats_cmd(self, ctx: LionContext):
"""
Statistics command.
"""
await ctx.interaction.response.defer(thinking=True)
ui = WeeklyMonthlyUI(self.bot, ctx.author, ctx.guild)
await ui.run(ctx.interaction)
@cmds.hybrid_command(
name=_p('cmd:leaderboard', "leaderboard"),
description=_p(
'cmd:leaderboard|desc',
"Server leaderboard."
)
)
@appcmds.guild_only
async def leaderboard_cmd(self, ctx: LionContext):
...

View File

@@ -0,0 +1,308 @@
from typing import Optional, Iterable
from itertools import chain
from psycopg import sql
from data import RowModel, Registry, Table
from data.columns import Integer, String, Timestamp, Bool
from utils.lib import utc_now
class StatsData(Registry):
class PastSession(RowModel):
"""
Schema
------
CREATE TABLE session_history(
sessionid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
channelid BIGINT,
channel_type SessionChannelType,
rating INTEGER,
tag TEXT,
start_time TIMESTAMPTZ NOT NULL,
duration INTEGER NOT NULL,
coins_earned INTEGER NOT NULL,
live_duration INTEGER DEFAULT 0,
stream_duration INTEGER DEFAULT 0,
video_duration INTEGER DEFAULT 0,
FOREIGN KEY (guildid, userid) REFERENCES members (guildid, userid) ON DELETE CASCADE
);
CREATE INDEX session_history_members ON session_history (guildid, userid, start_time);
"""
_tablename_ = "session_history"
class CurrentSession(RowModel):
"""
Schema
------
CREATE TABLE current_sessions(
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
channelid BIGINT,
channel_type SessionChannelType,
rating INTEGER,
tag TEXT,
start_time TIMESTAMPTZ DEFAULT now(),
live_duration INTEGER DEFAULT 0,
live_start TIMESTAMPTZ,
stream_duration INTEGER DEFAULT 0,
stream_start TIMESTAMPTZ,
video_duration INTEGER DEFAULT 0,
video_start TIMESTAMPTZ,
hourly_coins INTEGER NOT NULL,
hourly_live_coins INTEGER NOT NULL,
FOREIGN KEY (guildid, userid) REFERENCES members (guildid, userid) ON DELETE CASCADE
);
CREATE UNIQUE INDEX current_session_members ON current_sessions (guildid, userid);
"""
_tablename_ = "current_sessions"
class VoiceSessionStats(RowModel):
"""
View containing voice session statistics.
Schema
------
CREATE VIEW voice_sessions_combined AS
SELECT
userid,
guildid,
start_time,
duration,
(start_time + duration * interval '1 second') AS end_time
FROM session_history
UNION ALL
SELECT
userid,
guildid,
start_time,
EXTRACT(EPOCH FROM (NOW() - start_time)) AS duration,
NOW() AS end_time
FROM current_sessions;
"""
_tablename_ = "voice_sessions_combined"
userid = Integer()
guildid = Integer()
start_time = Timestamp()
duration = Integer()
end_time = Timestamp()
@classmethod
async def study_time_between(cls, guildid: int, userid: int, _start, _end) -> int:
conn = cls._connector.get_connection()
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT study_time_between(%s, %s, %s, %s)",
(guildid, userid, _start, _end)
)
return (await cursor.fetchone()[0]) or 0
@classmethod
async def study_times_between(cls, guildid: int, userid: int, *points) -> list[int]:
if len(points) < 2:
raise ValueError('Not enough block points given!')
blocks = zip(points, points[1:])
query = sql.SQL(
"""
SELECT
study_time_between(%s, %s, t._start, t._end) AS stime
FROM
(VALUES {})
AS
t (_start, _end)
"""
).format(
sql.SQL(', ').join(
sql.SQL("({}, {})").format(sql.Placeholder(), sql.Placeholder()) for _ in points[1:]
)
)
conn = await cls._connector.get_connection()
async with conn.cursor() as cursor:
await cursor.execute(
query,
tuple(chain((guildid, userid), *blocks))
)
return [r['stime'] or 0 for r in await cursor.fetchall()]
@classmethod
async def study_time_since(cls, guildid: int, userid: int, _start) -> int:
conn = cls._connector.get_connection()
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT study_time_since(%s, %s, %s)",
(guildid, userid, _start)
)
return (await cursor.fetchone()[0]) or 0
@classmethod
async def study_times_since(cls, guildid: int, userid: int, *starts) -> int:
if len(starts) < 1:
raise ValueError('No starting points given!')
query = sql.SQL(
"""
SELECT
study_time_since(%s, %s, t._start) AS stime
FROM
(VALUES {})
AS
t (_start)
"""
).format(
sql.SQL(', ').join(
sql.SQL("({})").format(sql.Placeholder()) for _ in starts
)
)
conn = await cls._connector.get_connection()
async with conn.cursor() as cursor:
await cursor.execute(
query,
tuple(chain((guildid, userid), starts))
)
return [r['stime'] or 0 for r in await cursor.fetchall()]
class ProfileTag(RowModel):
"""
Schema
------
CREATE TABLE member_profile_tags(
tagid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
tag TEXT NOT NULL,
_timestamp TIMESTAMPTZ DEFAULT now(),
FOREIGN KEY (guildid, userid) REFERENCES members (guildid, userid)
);
CREATE INDEX member_profile_tags_members ON member_profile_tags (guildid, userid);
"""
_tablename_ = 'member_profile_tags'
tagid = Integer(primary=True)
guildid = Integer()
userid = Integer()
tag = String()
_timestamp = Timestamp()
@classmethod
async def fetch_tags(self, guildid: Optional[int], userid: int):
tags = await self.fetch_where(guildid=guildid, userid=userid)
if not tags and guildid is not None:
tags = await self.fetch_where(guildid=None, userid=userid)
return [tag.tag for tag in tags]
@classmethod
async def set_tags(self, guildid: Optional[int], userid: int, tags: Iterable[str]):
conn = await self._connector.get_connection()
async with conn.transaction():
await self.table.delete_where(guildid=guildid, userid=userid)
if tags:
await self.table.insert_many(
('guildid', 'userid', 'tag'),
*((guildid, userid, tag) for tag in tags)
)
class WeeklyGoals(RowModel):
"""
Schema
------
CREATE TABLE member_weekly_goals(
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
weekid INTEGER NOT NULL, -- Epoch time of the start of the UTC week
study_goal INTEGER,
task_goal INTEGER,
_timestamp TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (guildid, userid, weekid),
FOREIGN KEY (guildid, userid) REFERENCES members (guildid, userid) ON DELETE CASCADE
);
CREATE INDEX member_weekly_goals_members ON member_weekly_goals (guildid, userid);
"""
_tablename_ = 'member_weekly_goals'
guildid = Integer(primary=True)
userid = Integer(primary=True)
weekid = Integer(primary=True)
study_goal = Integer()
task_goal = Integer()
_timestamp = Timestamp()
class WeeklyTasks(RowModel):
"""
Schema
------
CREATE TABLE member_weekly_goal_tasks(
taskid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
weekid INTEGER NOT NULL,
content TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT FALSE,
_timestamp TIMESTAMPTZ DEFAULT now(),
FOREIGN KEY (weekid, guildid, userid) REFERENCES member_weekly_goals (weekid, guildid, userid) ON DELETE CASCADE
);
CREATE INDEX member_weekly_goal_tasks_members_weekly ON member_weekly_goal_tasks (guildid, userid, weekid);
"""
_tablename_ = 'member_weekly_goal_tasks'
taskid = Integer(primary=True)
guildid = Integer()
userid = Integer()
weekid = Integer()
content = String()
completed = Bool()
_timestamp = Timestamp()
class MonthlyGoals(RowModel):
"""
Schema
------
CREATE TABLE member_monthly_goals(
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
monthid INTEGER NOT NULL, -- Epoch time of the start of the UTC month
study_goal INTEGER,
task_goal INTEGER,
_timestamp TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (guildid, userid, monthid),
FOREIGN KEY (guildid, userid) REFERENCES members (guildid, userid) ON DELETE CASCADE
);
CREATE INDEX member_monthly_goals_members ON member_monthly_goals (guildid, userid);
"""
_tablename_ = 'member_monthly_goals'
guildid = Integer(primary=True)
userid = Integer(primary=True)
monthid = Integer(primary=True)
study_goal = Integer()
task_goal = Integer()
_timestamp = Timestamp()
class MonthlyTasks(RowModel):
"""
Schema
------
CREATE TABLE member_monthly_goal_tasks(
taskid SERIAL PRIMARY KEY,
guildid BIGINT NOT NULL,
userid BIGINT NOT NULL,
monthid INTEGER NOT NULL,
content TEXT NOT NULL,
completed BOOLEAN NOT NULL DEFAULT FALSE,
_timestamp TIMESTAMPTZ DEFAULT now(),
FOREIGN KEY (monthid, guildid, userid) REFERENCES member_monthly_goals (monthid, guildid, userid) ON DELETE CASCADE
);
CREATE INDEX member_monthly_goal_tasks_members_monthly ON member_monthly_goal_tasks (guildid, userid, monthid);
"""
_tablename_ = 'member_monthly_goal_tasks'
taskid = Integer(primary=True)
guildid = Integer()
userid = Integer()
monthid = Integer()
content = String()
completed = Bool()
_timestamp = Timestamp()

View File

@@ -0,0 +1,91 @@
from datetime import timedelta
from psycopg.sql import SQL
from data import NULL
from meta import LionBot
from gui.cards import WeeklyGoalCard, MonthlyGoalCard
from gui.base import CardMode
from ..data import StatsData
from ..lib import extract_weekid, extract_monthid, apply_week_offset, apply_month_offset
async def get_goals_card(
bot: LionBot, userid: int, guildid: int, offset: int, weekly: bool, mode: CardMode
):
data: StatsData = bot.get_cog('StatsCog').data
lion = await bot.core.lions.fetch(guildid, userid)
today = lion.today
# Calculate periodid and select the correct model
if weekly:
goal_model = data.WeeklyGoals
tasks_model = data.WeeklyTasks
start = today - timedelta(days=today.weekday())
start, end = apply_week_offset(start, offset), apply_week_offset(start, offset - 1)
periodid = extract_weekid(start)
key = {'guildid': guildid, 'userid': userid, 'weekid': periodid}
else:
goal_model = data.MonthlyGoals
tasks_model = data.MonthlyTasks
start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
start, end = apply_month_offset(start, offset), apply_month_offset(start, offset - 1)
periodid = extract_monthid(start)
key = {'guildid': guildid, 'userid': userid, 'monthid': periodid}
# Extract goals and tasks
goals = await goal_model.fetch_or_create(*key.values())
task_rows = await tasks_model.fetch_where(**key)
tasks = [(i, row.content, bool(row.completed)) for i, row in enumerate(task_rows)]
# Compute task progress
task_data = bot.get_cog('TasklistCog').data
task_model = task_data.Task
task_query = task_model.table.select_where(
task_model.completed_at != NULL,
task_model.completed_at >= start,
task_model.completed_at <= end,
userid=userid,
).select(total=SQL('COUNT(*)')).with_no_adapter()
results = await task_query
tasks_completed = results[0]['total'] if results else 0
# Set and compute correct middle goal column
# if mode in (CardMode.VOICE, CardMode.STUDY):
if True:
model = data.VoiceSessionStats
middle_completed = (await model.study_times_between(guildid or None, userid, start, end))[0]
middle_goal = goals['study_goal']
# Compute schedule session progress
# TODO
sessions_complete = 0.5
# Get member profile
if member := await lion.get_member():
username = (member.display_name, member.discriminator)
avatar = member.avatar.key
else:
username = (lion.data.display_name, '#????')
avatar = lion.user_data.avatar_hash
# Getch badges
badges = await data.ProfileTag.fetch_tags(guildid, userid)
card_cls = WeeklyGoalCard if weekly else MonthlyGoalCard
card = card_cls(
name=username[0],
discrim=username[1],
avatar=(userid, avatar),
badges=badges,
tasks_done=tasks_completed,
tasks_goal=goals['task_goal'],
studied_hours=middle_completed,
studied_goal=middle_goal,
attendance=sessions_complete,
goals=tasks,
date=today,
skin={'mode': mode}
)
return card

View File

@@ -0,0 +1,96 @@
from typing import Optional
from datetime import timedelta
import calendar
from data import ORDER
from meta import LionBot
from gui.cards import MonthlyStatsCard
from gui.base import CardMode
from ..data import StatsData
from ..lib import apply_month_offset
async def get_monthly_card(bot: LionBot, userid: int, guildid: int, offset: int, mode: CardMode) -> MonthlyStatsCard:
data: StatsData = bot.get_cog('StatsCog').data
lion = await bot.core.lions.fetch(guildid, userid)
today = lion.today
month_start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
target = apply_month_offset(month_start, offset)
target_end = (target + timedelta(days=40)).replace(day=1, hour=0, minute=0) - timedelta(days=1)
months = [target]
for i in range(0, 3):
months.append((months[-1] - timedelta(days=1)).replace(day=1))
months.reverse()
monthly = [
[0]*(calendar.monthrange(month.year, month.month)[1]) for month in months
]
# TODO: Select model based on card mode
model = data.VoiceSessionStats
# Get first session
query = model.table.select_where().order_by('start_time', ORDER.ASC).limit(1)
if guildid:
query = query.where(userid=userid, guildid=guildid)
else:
query = query.where(userid=userid)
results = await query
first_session = results[0]['start_time'] if results else None
if not first_session:
current_streak = 0
longest_streak = 0
else:
first_day = first_session.replace(hour=0, minute=0, second=0, microsecond=0)
first_month = first_day.replace(day=1)
# Build list of day starts up to now, or end of requested month
requests = []
end_of_req = target_end if offset else today
day = first_day
while day <= end_of_req:
day = day + timedelta(days=1)
requests.append(day)
# Request times between requested days
day_stats = await model.study_times_between(guildid or None, userid, *requests)
# Compute current streak and longest streak
current_streak = 0
longest_streak = 0
for day in day_stats:
if day > 0:
current_streak += 1
longest_streak = max(current_streak, longest_streak)
else:
current_streak = 0
# Populate monthly
offsets = {(month.year, month.month): i for i, month in enumerate(months)}
for day, stat in zip(reversed(requests[:-1]), reversed(day_stats)):
if day < months[0]:
break
i = offsets[(day.year, day.month)]
monthly[i][day.day - 1] = stat / 3600
# Get member profile
if member := await lion.get_member():
username = (member.display_name, member.discriminator)
else:
username = (lion.data.display_name, '#????')
# Request card
card = MonthlyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
month=int(target.timestamp()),
monthly=monthly,
current_streak=current_streak,
longest_streak=longest_streak,
skin={'mode': mode}
)
return card

View File

@@ -0,0 +1,63 @@
from typing import Optional
from datetime import datetime, timedelta
import discord
from meta import LionBot
from gui.cards import StatsCard
from ..data import StatsData
async def get_stats_card(bot: LionBot, userid: int, guildid: int):
data: StatsData = bot.get_cog('StatsCog').data
# TODO: Workouts
# TODO: Leaderboard rankings
guildid = guildid or 0
lion = await bot.core.lions.fetch(guildid, userid)
# Calculate the period timestamps, i.e. start time for each summary period
# TODO: Don't do the alltime one like this, not efficient anymore
# TODO: Unless we rewrite study_time_since again?
today = lion.today
month_start = today.replace(day=1)
period_timestamps = (
datetime(1970, 1, 1),
month_start,
today - timedelta(days=today.weekday()),
today
)
# Extract the study times for each period
study_times = await data.VoiceSessionStats.study_times_since(guildid, userid, *period_timestamps)
print("Study times", study_times)
# Calculate streak data by requesting times per day
# First calculate starting timestamps for each day
days = list(range(0, today.day + 2))
day_timestamps = [month_start + timedelta(days=day - 1) for day in days]
study_times = await data.VoiceSessionStats.study_times_between(guildid, userid, *day_timestamps)
print("Study times", study_times)
# Then extract streak tuples
streaks = []
streak_start = None
for day, stime in zip(days, study_times):
stime = stime or 0
if stime > 0 and streak_start is None:
streak_start = day
elif stime == 0 and streak_start is not None:
streaks.append((streak_start, day-1))
streak_start = None
if streak_start is not None:
streaks.append((streak_start, today.day))
card = StatsCard(
(0, 0),
list(reversed(study_times)),
100,
streaks,
)
return card

View File

@@ -0,0 +1,54 @@
from typing import Optional
from datetime import timedelta
from data import ORDER
from meta import LionBot
from gui.cards import WeeklyStatsCard
from gui.base import CardMode
from ..data import StatsData
async def get_weekly_card(bot: LionBot, userid: int, guildid: int, offset: int, mode: CardMode) -> WeeklyStatsCard:
data: StatsData = bot.get_cog('StatsCog').data
lion = await bot.core.lions.fetch(guildid, userid)
today = lion.today
week_start = today - timedelta(days=today.weekday()) - timedelta(weeks=offset)
days = [week_start + timedelta(i) for i in range(-7, 7 if offset else (today.weekday() + 1))]
# TODO: Select statistics model based on mode
model = data.VoiceSessionStats
# Get user session rows
query = model.table.select_where()
if guildid:
query = query.where(userid=userid, guildid=guildid).order_by('start_time', ORDER.ASC)
else:
query = query.where(userid=userid)
sessions = await query
# Extract quantities per-day
day_stats = await model.study_times_between(guildid or None, userid, *days)
for i in range(14 - len(day_stats)):
day_stats.append(0)
# Get member profile
if member := await lion.get_member():
username = (member.display_name, member.discriminator)
else:
username = (lion.data.display_name, '#????')
card = WeeklyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
week=week_start.timestamp(),
daily=tuple(map(lambda n: n/3600, day_stats)),
sessions=[
(int(session['start_time'].timestamp()), int(session['end_time'].timestamp()))
for session in sessions
],
skin={'mode': mode}
)
return card

View File

@@ -0,0 +1,46 @@
from datetime import timedelta
import pytz
def extract_weekid(timestamp) -> int:
"""
Extract a weekid from a given timestamp with timezone.
Weekids are calculated by first stripping the timezone,
then extracting the UTC timestamp of the start of the week.
"""
day_start = timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
week_start = day_start - timedelta(days=day_start.weekday())
return int(week_start.replace(tzinfo=pytz.utc).timestamp())
def extract_monthid(timestamp) -> int:
"""
Extract a monthid from a given timestamp with timezone.
Monthids are calculated by first stripping the timezone,
then extracting the UTC timestamp from the start of the month.
"""
month_start = timestamp.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return int(month_start.replace(tzinfo=pytz.utc).timestamp())
def apply_week_offset(timestamp, offset):
return timestamp - timedelta(weeks=offset)
def apply_month_offset(timestamp, offset):
raw_month = timestamp.month - offset - 1
timestamp = timestamp.replace(
year=timestamp.year + int(raw_month // 12),
month=(raw_month % 12) + 1
)
return timestamp
def week_difference(ts_1, ts_2):
return int((ts_2 - ts_1).total_seconds() // (7*24*3600))
def month_difference(ts_1, ts_2):
return (ts_2.month - ts_1.month) + (ts_2.year - ts_1.year) * 12

View File

@@ -0,0 +1,3 @@
# Questions
- New GUI designs or sample layouts for each card in general mode?
- New achievements? Should they be customisable?

View File

@@ -0,0 +1,2 @@
from .profile import ProfileUI
from .weeklymonthly import WeeklyMonthlyUI

View File

@@ -0,0 +1,111 @@
from typing import Optional
from enum import IntEnum
import asyncio
import gc
import discord
from discord.ui.button import ButtonStyle, button, Button
from discord.ui.text_input import TextInput, TextStyle
from discord.ui.select import select, Select, SelectOption
from meta import LionBot, LionCog, conf
from meta.errors import UserInputError
from utils.lib import MessageArgs
from utils.ui import LeoUI, ModalRetryUI, FastModal, error_handler_for
from babel.translator import ctx_translator
from gui.cards import ProfileCard, StatsCard
from ..graphics.stats import get_stats_card
from ..data import StatsData
from .. import babel
_p = babel._p
class StatsUI(LeoUI):
def __init__(self, bot, user, guild, **kwargs):
super().__init__(**kwargs)
self.bot: LionBot = bot
self.user: discord.User | discord.Member = user
self.guild: Optional[discord.Guild] = guild
# State
self._showing_global = self.guild is None
self._refresh_lock = asyncio.Lock()
# Original interaction, response is used to display UI
self._original: Optional[discord.Interaction] = None
@property
def guildid(self) -> Optional[int]:
"""
ID of guild to render stats for, or None if global.
"""
return self.guild.id if not self._showing_global else None
@property
def userid(self) -> int:
"""
ID of user to render stats for.
"""
return self.user.id
async def interaction_check(self, interaction: discord.Interaction):
return interaction.user.id == self.user.id
async def cleanup(self):
if self._original and not self._original.is_expired():
try:
await self._original.edit_original_response(view=None)
except discord.HTTPException:
pass
self._original = None
@button(emoji=conf.emojis.cancel, style=ButtonStyle.red)
async def close_button(self, press: discord.Interaction, pressed: Button):
"""
Delete the output message and close the UI.
"""
await press.response.defer()
await self._original.delete_original_response()
self._original = None
await self.close()
async def close_button_refresh(self):
pass
async def refresh_components(self):
raise NotImplementedError
async def reload(self):
raise NotImplementedError
async def make_message(self) -> MessageArgs:
raise NotImplementedError
async def redraw(self, thinking: Optional[discord.Interaction] = None):
"""
Redraw the UI.
If a thinking interaction is provided,
deletes the response while redrawing.
"""
args = await self.make_message()
if thinking is not None and not thinking.is_expired() and thinking.response.is_done():
asyncio.create_task(thinking.delete_original_response())
await self._original.edit_original_response(**args.edit_args, view=self)
async def refresh(self, thinking: Optional[discord.Interaction] = None):
"""
Refresh the UI.
"""
async with self._refresh_lock:
await self.reload()
await self.refresh_components()
await self.redraw(thinking=thinking)
async def run(self, interaction: discord.Interaction):
"""
Execute the UI using the given interaction.
"""
raise NotImplementedError

View File

@@ -0,0 +1,176 @@
from typing import Optional
from enum import Enum
import asyncio
import discord
from discord.ui.button import button, Button, ButtonStyle
from discord.ui.select import select, Select, SelectOption
from utils.lib import MessageArgs
from .. import babel
from .base import StatsUI
from gui.cards import WeeklyStatsCard, MonthlyStatsCard, WeeklyGoalCard, MonthlyGoalCard
_p = babel._p
class SessionType(Enum):
Voice = 0
Text = 1
Anki = 2
class GoalBaseUI(StatsUI):
"""
switcher row, local|global
voice, text, anki
Prev, Select, Next, Edit Goals
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__rendered = {} # (offset, type) |-> (goal_card, stats_card)
self._offset: int = 0
self._stat_type: Optional[SessionType] = None
@property
def _key(self):
return (self._offset, self._stat_type)
@property
def _rendered(self):
return self.__rendered.get(self._key, None) is not None
async def lazy_rerun_using(self, interaction: discord.Interaction):
if self._rendered:
await interaction.response.defer()
waiting = None
else:
await interaction.response.defer(thinking=True)
waiting = interaction
await self.run(waiting_interaction=waiting)
@button(label='VOICE_PLACEHOLDER')
async def voice_pressed(self, press: discord.Interaction, pressed: Button):
self._stat_type = SessionType.Voice
await self.lazy_rerun_using(press)
@button(label='TEXT_PLACEHOLDER')
async def text_pressed(self, press: discord.Interaction, pressed: Button):
self._stat_type = SessionType.Text
await self.lazy_rerun_using(press)
@button(label='ANKI_PLACEHOLDER')
async def anki_pressed(self, press: discord.Interaction, pressed: Button):
self._stat_type = SessionType.Anki
await self.lazy_rerun_using(press)
@button(label="PREV_PLACEHOLDER")
async def prev_pressed(self, press: discord.Interaction, pressed: Button):
self._offset -= 1
await self.lazy_rerun_using(press)
@button(label="NEXT_PLACEHOLDER")
async def next_pressed(self, press: discord.Interaction, pressed: Button):
self._offset += 1
await self.lazy_rerun_using(press)
@button(label="SELECT_PLACEHOLDER")
async def next_pressed(self, press: discord.Interaction, pressed: Button):
# TODO: Date selection
...
@button(label="EDIT_PLACEHOLDER")
async def edit_pressed(self, press: discord.Interaction, pressed: Button):
# TODO: Goal editing
...
class MonthlyUI(StatsUI):
_ui_name = _p('ui:MonthlyUI|name', 'Monthly')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rendered = False
self._stats_card: Optional[MonthlyStatsCard] = None
self._goals_card: Optional[MonthlyGoalCard] = None
async def redraw(self):
self._layout = [
(*self._switcher_buttons, self.toggle_pressed)
]
await super().redraw()
async def make_message(self) -> MessageArgs:
if not self._rendered:
await self._render()
stats_file = self._stats_card.as_file('monthly_stats.png')
goals_file = self._goals_card.as_file('monthly_goals.png')
return MessageArgs(files=[goals_file, stats_file])
async def _render(self):
await asyncio.gather(self._render_goals(), self._render_stats())
self._rendered = True
async def _render_stats(self):
args = await MonthlyStatsCard.sample_args(None)
card = MonthlyStatsCard(**args)
await card.render()
self._stats_card = card
return card
async def _render_goals(self):
args = await MonthlyGoalCard.sample_args(None)
card = WeeklyGoalCard(**args)
await card.render()
self._goals_card = card
return card
class WeeklyUI(StatsUI):
_ui_name = _p('ui:WeeklyUI|name', 'Weekly')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rendered = False
self._stats_card: Optional[WeeklyStatsCard] = None
self._goals_card: Optional[WeeklyGoalCard] = None
async def redraw(self):
self._layout = [
[*self._switcher_buttons]
]
if self.guild:
self._layout[0].append(self.toggle_pressed)
await super().redraw()
async def _render(self):
await asyncio.gather(self._render_goals(), self._render_stats())
self._rendered = True
async def make_message(self) -> MessageArgs:
if not self._rendered:
await self._render()
stats_file = self._stats_card.as_file('weekly_stats.png')
goals_file = self._goals_card.as_file('weekly_goals.png')
return MessageArgs(files=[goals_file, stats_file])
async def _render_stats(self):
args = await WeeklyStatsCard.sample_args(None)
card = WeeklyStatsCard(**args)
await card.render()
self._stats_card = card
return card
async def _render_goals(self):
args = await WeeklyGoalCard.sample_args(None)
card = WeeklyGoalCard(**args)
await card.render()
self._goals_card = card
return card

View File

View File

View File

@@ -0,0 +1,325 @@
from typing import Optional
from enum import IntEnum
import asyncio
import discord
from discord.ui.button import ButtonStyle, button, Button
from discord.ui.text_input import TextInput, TextStyle
from discord.ui.select import select, Select, SelectOption
from meta import LionBot, LionCog, conf
from meta.errors import UserInputError
from utils.lib import MessageArgs
from utils.ui import LeoUI, ModalRetryUI, FastModal, error_handler_for
from babel.translator import ctx_translator
from gui.cards import ProfileCard, StatsCard
from ..graphics.stats import get_stats_card
from ..data import StatsData
from .. import babel
from .base import StatsUI
_p = babel._p
class ProfileEditor(FastModal):
limit = 5
editor = TextInput(
label='',
style=TextStyle.long,
max_length=100,
required=False
)
def setup_editor(self):
t = ctx_translator.get().t
self.editor.label = t(_p(
'modal:profile_editor|field:editor|label', "Profile Tags (One line per tag)"
))
self.editor.placeholder = t(_p(
'modal:profile_editor|field:editor|placeholder',
"Mathematician\n"
"Loves Cats"
))
def setup(self):
t = ctx_translator.get().t
self.title = t(_p(
'modal:profile_editor|title',
"Profile Tag Editor"
))
self.setup_editor()
def __init__(self, **kwargs):
self.setup()
super().__init__(**kwargs)
async def parse(self):
new_tags = (tag.strip() for tag in self.editor.value.splitlines())
new_tags = [tag for tag in new_tags if tag]
# Validate tags
if len(new_tags) > ProfileEditor.limit:
t = ctx_translator.get().t
raise UserInputError(
t(_p(
'modal:profile_editor|error:too_many_tags',
"Too many tags! You can have at most `{limit}` profile tags."
)).format(limit=ProfileEditor.limit)
)
# TODO: Per tag length validation
return new_tags
@error_handler_for(UserInputError)
async def rerequest(self, interaction: discord.Interaction, error: UserInputError):
await ModalRetryUI(self, error.msg).respond_to(interaction)
class StatType(IntEnum):
VOICE = 0
TEXT = 1
ANKI = 2
def select_name(self):
if self is self.VOICE:
# TODO: Handle study and general modes
name = _p(
'menu:stat_type|opt:voice|name',
"Voice Statistics"
)
elif self is self.TEXT:
name = _p(
'menu:stat_type|opt:text|name',
"Text Statistics"
)
elif self is self.ANKI:
name = _p(
'menu:stat_type|opt:anki|name',
"Anki Statistics"
)
return name
class ProfileUI(StatsUI):
def __init__(self, bot, user, guild, **kwargs):
super().__init__(bot, user, guild, **kwargs)
# State
self._stat_type = StatType.VOICE
self._showing_stats = False
# Card data for rendering
self._profile_card: Optional[ProfileCard] = None
self._xp_card = None
self._stats_card: Optional[StatsCard] = None
self._stats_future: Optional[asyncio.Future] = None
@select(placeholder="...")
async def type_menu(self, selection: discord.Interaction, menu: Select):
value = int(menu.values[0])
if self._stat_type != value:
await selection.response.defer(thinking=True, ephemeral=True)
self._stat_type = StatType(value)
# Clear card state for reload
self._stats_card = None
if self._stats_future is not None and not self._stats_future.done():
self._stats_future.cancel()
self._stats_future = None
await self.refresh(thinking=selection)
else:
await selection.response.defer()
async def type_menu_refresh(self):
# TODO: Check enabled types
t = self.bot.translator.t
options = []
for item in StatType:
option = SelectOption(label=t(item.select_name()), value=str(item.value))
option.default = item is self._stat_type
options.append(option)
self.type_menu.options = options
@button(label="Edit Profile", style=ButtonStyle.blurple)
async def edit_button(self, press: discord.Interaction, pressed: Button):
"""
Press to open the profile tag editor.
Opens a ProfileEditor modal with error-rerun handling.
"""
t = self.bot.translator.t
data: StatsData = self.bot.get_cog('StatsCog').data
tags = await data.ProfileTag.fetch_tags(self.guildid, self.userid)
modal = ProfileEditor()
modal.editor.default = '\n'.join(tags)
@modal.submit_callback()
async def parse_tags(interaction: discord.Interaction):
new_tags = await modal.parse()
await interaction.response.defer(thinking=True, ephemeral=True)
# Set the new tags and refresh
await data.ProfileTag.set_tags(self.guildid, self.userid, new_tags)
if self._original is not None:
self._profile_card = None
await self.refresh(thinking=interaction)
else:
# Corner case where the UI has expired or been closed
embed = discord.Embed(
colour=discord.Colour.brand_green(),
description=t(_p(
'modal:profile_editor|resp:success',
"Your profile has been updated!"
))
)
await interaction.edit_original_response(embed=embed)
await press.response.send_modal(modal)
async def edit_button_refresh(self):
...
@button(label="Show Statistics", style=ButtonStyle.blurple)
async def stats_button(self, press: discord.Interaction, pressed: Button):
"""
Press to show or hide the statistics panel.
"""
self._showing_stats = not self._showing_stats
if self._stats_card or not self._showing_stats:
await press.response.defer()
await self.refresh()
else:
await press.response.defer(thinking=True, ephemeral=True)
await self.refresh(thinking=press)
async def stats_button_refresh(self):
button = self.stats_button
if self._showing_stats:
button.label = "Hide Statistics"
else:
button.label = "Show Statistics"
@button(label="Global Stats", style=ButtonStyle.blurple)
async def global_button(self, press: discord.Interaction, pressed: Button):
"""
Switch between local and global statistics modes.
This is only displayed when statistics are shown.
Also saves the value to user preferences.
"""
await press.response.defer(thinking=True, ephemeral=True)
self._showing_global = not self._showing_global
# TODO: Asynchronously update user preferences
# Clear card state for reload
self._stats_card = None
if self._stats_future is not None and not self._stats_future.done():
self._stats_future.cancel()
self._stats_future = None
await self.refresh(thinking=press if not self._showing_global else None)
if self._showing_global:
t = self.bot.translator.t
embed = discord.Embed(
colour=discord.Colour.orange(),
description=t(_p(
'ui:Profile|button:global|resp:success',
"You will now see statistics from all you servers (where applicable)! Press again to revert."
))
)
await press.edit_original_response(embed=embed)
async def global_button_refresh(self):
button = self.global_button
if self._showing_global:
button.label = "Server Statistics"
else:
button.label = "Global Statistics"
async def refresh_components(self):
"""
Refresh each UI component, and the overall layout.
"""
await asyncio.gather(
self.edit_button_refresh(),
self.global_button_refresh(),
self.stats_button_refresh(),
self.close_button_refresh(),
self.type_menu_refresh()
)
if self._showing_stats:
self._layout = [
(self.type_menu,),
(self.stats_button, self.global_button, self.edit_button, self.close_button)
]
else:
self._layout = [
(self.stats_button, self.edit_button, self.close_button)
]
async def _render_stats(self):
"""
Create and render the profile card.
"""
card = await get_stats_card(self.bot, self.userid, self.guildid)
await card.render()
self._stats_card = card
return card
async def _render_profile(self):
"""
Create and render the XP and stats cards.
"""
args = await ProfileCard.sample_args(None)
data: StatsData = self.bot.get_cog('StatsCog').data
args |= {'badges': await data.ProfileTag.fetch_tags(self.guildid, self.userid)}
card = ProfileCard(**args)
await card.render()
self._profile_card = card
return card
async def reload(self):
"""
Reload the UI data, applying cache where possible.
"""
# Render the cards if required
tasks = []
if self._profile_card is None:
profile_task = asyncio.create_task(self._render_profile())
tasks.append(profile_task)
if self._stats_card is None:
if self._stats_future is None or self._stats_future.done() or self._stats_future.cancelled():
self._stats_future = asyncio.create_task(self._render_stats())
if self._showing_stats:
tasks.append(self._stats_future)
if tasks:
await asyncio.gather(*tasks)
async def make_message(self) -> MessageArgs:
"""
Make the message arguments. Apply cache where possible.
"""
# Build the final message arguments
files = []
files.append(self._profile_card.as_file('profile.png'))
if self._showing_stats:
files.append(self._stats_card.as_file('stats.png'))
return MessageArgs(files=files)
async def run(self, interaction: discord.Interaction):
"""
Execute the UI using the given interaction.
"""
self._original = interaction
# TODO: Switch to using data cache in reload
self._showing_global = False
await self.refresh()

View File

@@ -0,0 +1,64 @@
from typing import Optional
import asyncio
import discord
from utils.lib import MessageArgs
from .. import babel
from .base import StatsUI
from gui.cards import StatsCard, ProfileCard
from ..graphics.stats import get_stats_card
_p = babel._p
class SummaryUI(StatsUI):
_ui_name = _p('ui:SummaryUI|name', 'Summary')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rendered = False
self._stats_card: Optional[StatsCard] = None
self._profile_card: Optional[ProfileCard] = None
async def redraw(self):
if self.guild is not None:
self._layout = [
(*self._switcher_buttons, self.toggle_pressed)
]
else:
self._layout = [
self._switcher_buttons
]
await super().redraw()
async def make_message(self) -> MessageArgs:
if not self._rendered:
await self._render()
stats_file = self._stats_card.as_file('stats.png')
profile_file = self._profile_card.as_file('profile.png')
# TODO: Refresh peer timeouts on interaction usage
# TODO: Write close and cleanup
return MessageArgs(files=[profile_file, stats_file])
async def _render(self):
await asyncio.gather(self._render_stats(), self._render_profile())
self._rendered = True
async def _render_stats(self):
card = await get_stats_card(self.bot, self.data, self.user.id, self.guild.id if self.guild else None)
await card.render()
self._stats_card = card
return card
async def _render_profile(self):
args = await ProfileCard.sample_args(None)
card = ProfileCard(**args)
await card.render()
self._profile_card = card
return card

View File

@@ -0,0 +1,866 @@
from typing import Optional, TypeAlias, Union
from enum import Enum, IntEnum
from datetime import timedelta, datetime
from dataclasses import dataclass
from collections import defaultdict
import asyncio
from asyncio import Future
import gc
import re
import discord
from discord.ui.button import ButtonStyle, button, Button
from discord.ui.text_input import TextInput, TextStyle
from discord.ui.select import select, Select, SelectOption
from meta import LionBot, LionCog, conf
from meta.errors import UserInputError
from utils.lib import MessageArgs
from utils.ui import LeoUI, ModalRetryUI, FastModal, error_handler_for
from babel.translator import ctx_translator, LazyStr
from babel.utils import local_month
from gui.cards import WeeklyGoalCard, WeeklyStatsCard, MonthlyGoalCard, MonthlyStatsCard
from gui.base import CardMode
from core.lion import Lion
from ..graphics.weekly import get_weekly_card
from ..graphics.monthly import get_monthly_card
from ..graphics.goals import get_goals_card
from ..data import StatsData
from .. import babel
from ..lib import (
extract_monthid, extract_weekid, apply_month_offset, apply_week_offset, month_difference, week_difference
)
from .base import StatsUI
_p = babel._p
GoalCard: TypeAlias = Union[WeeklyGoalCard, MonthlyGoalCard]
StatsCard: TypeAlias = Union[WeeklyStatsCard, MonthlyStatsCard]
class PeriodType(IntEnum):
WEEKLY = 0
MONTHLY = 1
class StatType(IntEnum):
VOICE = 0
TEXT = 1
ANKI = 2
class StatPage(Enum):
WEEKLY_VOICE = (0, PeriodType.WEEKLY, StatType.VOICE)
WEEKLY_TEXT = (1, PeriodType.WEEKLY, StatType.TEXT)
WEEKLY_ANKI = (2, PeriodType.WEEKLY, StatType.ANKI)
MONTHLY_VOICE = (3, PeriodType.MONTHLY, StatType.VOICE)
MONTHLY_TEXT = (4, PeriodType.MONTHLY, StatType.TEXT)
MONTHLY_ANKI = (5, PeriodType.MONTHLY, StatType.ANKI)
@classmethod
def from_value(cls, value: int) -> 'StatPage':
return next(item for item in cls if item.select_value == value)
@property
def period(self) -> PeriodType:
return self.value[1]
@property
def stat(self) -> StatType:
return self.value[2]
@property
def goal_key(self) -> tuple[str, str, str]:
if self.period == PeriodType.WEEKLY:
periodid = 'weekid'
elif self.period == PeriodType.MONTHLY:
periodid = 'monthlyid'
return ('guildid', 'userid', periodid)
@property
def select_value(self) -> int:
return self.value[0]
@property
def select_name(self) -> LazyStr:
if self.period is PeriodType.WEEKLY:
if self.stat is StatType.VOICE:
name = _p(
'menu:stat_type|opt:weekly_voice|name',
"Weekly Voice Statistics"
)
elif self.stat is StatType.TEXT:
name = _p(
'menu:stat_type|opt:weekly_text|name',
"Weekly Text Statistics"
)
elif self.stat is StatType.ANKI:
name = _p(
'menu:stat_type|opt:weekly_anki|name',
"Weekly Anki Statistics"
)
elif self.period is PeriodType.MONTHLY:
if self.stat is StatType.VOICE:
name = _p(
'menu:stat_type|opt:monthly_voice|name',
"Monthly Voice Statistics"
)
elif self.stat is StatType.TEXT:
name = _p(
'menu:stat_type|opt:monthly_text|name',
"Monthly Text Statistics"
)
elif self.stat is StatType.ANKI:
name = _p(
'menu:stat_type|opt:monthly_anki|name',
"Monthly Anki Statistics"
)
return name
class GoalEditor(FastModal):
limit = 10
task_regex = re.compile(r"\s*(?:\[\s*(?P<check>[^\]]+)?\s*\])?\s*(?P<task>.+)")
# First goal is usually tasks completed
first_goal = TextInput(
label='...',
style=TextStyle.short,
max_length=4,
required=False
)
def setup_first_goal(self):
t = ctx_translator.get().t
field = self.first_goal
field.label = t(_p(
'modal:goal_editor|field:task_goal|label',
"Task goal"
))
field.placeholder = t(_p(
'modal:goal_editor|field:task_goal|placeholder',
"Enter the number of tasklist tasks you aim to do"
))
async def parse_first_goal(self) -> Optional[int]:
t = ctx_translator.get().t
string = self.first_goal.value.strip()
if not string:
result = None
elif not string.isdigit():
raise UserInputError(t(_p(
'modal:goal_editor|field:task_goal|error:NAN',
"The provided task goal `{input}` is not a number! Please try again."
)).format(input=string)
)
else:
result = int(string)
return result
# Second goal is either message count, voice hours, or cards completed
second_goal = TextInput(
label='...',
style=TextStyle.short,
max_length=6,
required=False
)
def setup_second_goal(self) -> Optional[int]:
t = ctx_translator.get().t
field = self.second_goal
# TODO: Study vs Voice customisation
if self.stat_page.stat is StatType.VOICE:
field.label = t(_p(
'modal:goal_editor|field:voice_goal|label',
"Study time goal"
))
field.placeholder = t(_p(
'modal:goal_editor|field:voice_goal|placeholder',
"Enter a number of hours of study to aim for."
))
elif self.stat_page.stat is StatType.TEXT:
field.label = t(_p(
'modal:goal_editor|field:text_goal|label',
"Message goal"
))
field.placeholder = t(_p(
'modal:goal_editor|field:text_goal|placeholder',
"Enter a message count to aim for."
))
elif self.stat_page.stat is StatType.ANKI:
field.label = t(_p(
'modal:goal_editor|field:anki_goal|label',
"Card goal"
))
field.placeholder = t(_p(
'modal:goal_editor|field:anki_goal|label',
"Enter a number of card revisions to aim for."
))
async def parse_second_goal(self) -> Optional[int]:
t = ctx_translator.get().t
string = self.second_goal.value.strip()
if not string:
result = None
elif not string.isdigit():
if self.stat_page.stat is StatType.VOICE:
raise UserInputError(
t(_p(
'modal:goal_editor|field:voice_goal|error:NAN',
"The provided study time goal `{input}` is not a number! Please try again."
)).format(input=string)
)
elif self.stat_page.stat is StatType.TEXT:
raise UserInputError(
t(_p(
'modal:goal_editor|field:text_goal|error:NAN',
"The provided message goal `{input}` is not a number! Please try again."
))
)
elif self.stat_page.stat is StatType.ANKI:
raise UserInputError(
t(_p(
'modal:goal_editor|field:anki_goal|error:NAN',
"The provided card goal `{input}` is not a number! Please try again."
))
)
else:
result = int(string)
return result
# Both weekly and monthly have task goals, independent of mode
task_editor = TextInput(
label='',
style=TextStyle.long,
max_length=500,
required=False
)
def setup_task_editor(self):
t = ctx_translator.get().t
field = self.task_editor
if self.stat_page.period is PeriodType.WEEKLY:
field.label = t(_p(
'modal:goal_editor|field:weekly_task_editor|label',
"Tasks to complete this week (one per line)"
))
field.placeholder = t(_p(
'modal:goal_editor|field:weekly_task_editor|placeholder',
"[ ] Write my biology essay\n"
"[x] Complete the second maths assignment\n"
))
else:
field.label = t(_p(
'modal:goal_editor|field:monthly_task_editor|label',
"Tasks to complete this month (one per line)"
))
field.placeholder = t(_p(
'modal:goal_editor|field:monthly_task_editor|placeholder',
"[ ] Write my biology essay\n"
"[x] Complete the second maths assignment\n"
))
async def parse_task_editor(self) -> list[tuple[bool, str]]:
t = ctx_translator.get().t
task_lines = (line.strip() for line in self.task_editor.value.splitlines())
task_lines = [line for line in task_lines if line]
tasks: list[tuple[bool, str]] = []
for line in task_lines:
match = self.task_regex.match(line)
if not match:
# This should be essentially impossible
# since the regex is a wildcard
raise UserInputError(
t(_p(
'modal:goal_editor||field:task_editor|error:parse_general',
"Malformed task!\n`{input}`"
)).format(input=line)
)
# TODO Length validation
check = bool(match['check'])
task = match['task']
if not task or not (task := task.strip()):
continue
tasks.append((check, task))
return tasks
def setup(self):
t = ctx_translator.get().t
if self.stat_page.period is PeriodType.WEEKLY:
self.title = t(_p(
'modal:goal_editor|title',
"Weekly goal editor"
))
else:
self.title = t(_p(
'modal:goal_editor|monthly|title',
"Monthly goal editor"
))
self.setup_first_goal()
self.setup_second_goal()
self.setup_task_editor()
def __init__(self, stat_page: StatPage, *args, **kwargs):
self.stat_page = stat_page
self.setup()
super().__init__(*args, **kwargs)
async def parse(self) -> tuple[Optional[int], Optional[int], list[tuple[bool, str]]]:
"""
Parse goal editor submission.
Raises UserInputError with a human-readable message if the input cannot be parsed.
"""
first = await self.parse_first_goal()
second = await self.parse_second_goal()
tasks = await self.parse_task_editor()
return (first, second, tasks)
@error_handler_for(UserInputError)
async def rerequest(self, interaction: discord.Interaction, error: UserInputError):
await ModalRetryUI(self, error.msg).respond_to(interaction)
PageKey: TypeAlias = tuple[bool, int, StatPage]
class WeeklyMonthlyUI(StatsUI):
def __init__(self, bot, user, guild, **kwargs):
super().__init__(bot, user, guild, **kwargs)
self.data: StatsData = bot.get_cog('StatsCog').data
# State
self.lion: Optional[Lion] = None
self._stat_page: StatPage = StatPage.WEEKLY_VOICE
self._week_offset = 0
self._month_offset = 0
self._showing_selector = False
self._selector_cache = {} # (offset, StatPage) -> SelectMenu
self._selector_offset = defaultdict(lambda: 0) # StatPage -> top entry offset
self._selector_offset_limit: dict[tuple[bool, StatType], int] = {} # bottom entry offset
# Card data
self._card_cache: dict[PageKey, tuple[Future[GoalCard], Future[StatsCard]]] = {}
@property
def key(self) -> PageKey:
return (self._showing_global, self._offset, self._stat_page)
@property
def tasks(self):
"""
Return the render tasks for the current key.
"""
return self._card_cache.get(self.key, (None, None))
@property
def weekly(self):
"""
Whether the UI is in a weekly mode.
"""
return self._stat_page.period == PeriodType.WEEKLY
@property
def _offset(self):
"""
Return the current weekly or monthly offset, as appropriate.
"""
return self._week_offset if self.weekly else self._month_offset
@_offset.setter
def _offset(self, value):
if self.weekly:
self._week_offset = value
else:
self._month_offset = value
async def cleanup(self):
await super().cleanup()
# Card cache is potentially quite large, so explicitly garbage collect
del self._card_cache
gc.collect()
@select(placeholder="...")
async def type_menu(self, selection: discord.Interaction, menu: Select):
value = StatPage.from_value(int(menu.values[0]))
if self._stat_page is not value:
await selection.response.defer(thinking=True, ephemeral=True)
self._stat_page = value
await self.refresh(thinking=selection)
else:
await selection.response.defer()
async def type_menu_refresh(self):
# TODO: Check enabled types
t = self.bot.translator.t
options = []
for item in StatPage:
option = SelectOption(label=t(item.select_name), value=str(item.select_value))
option.default = item is self._stat_page
options.append(option)
self.type_menu.options = options
@button(label="Edit Goals", style=ButtonStyle.blurple)
async def edit_button(self, press: discord.Interaction, pressed: Button):
"""
Press to open the goal editor for this stat type.
"""
# Extract goal data
# Open goal modal
# Parse goal modal submit, validate input
# If validation is successful, then update goal list by replacement
t = self.bot.translator.t
now = self.lion.now
if self.weekly:
goal_model = self.data.WeeklyGoals
tasks_model = self.data.WeeklyTasks
weekid = extract_weekid(apply_week_offset(now, self._week_offset))
key = {'guildid': self.guildid, 'userid': self.userid, 'weekid': weekid}
else:
goal_model = self.data.MonthlyGoals
tasks_model = self.data.MonthlyTasks
monthid = extract_monthid(apply_month_offset(now, self._month_offset))
key = {'guildid': self.guildid, 'userid': self.userid, 'monthid': monthid}
if self._stat_page.stat is StatType.VOICE:
goal_keys = ('task_goal', 'study_goal')
elif self._stat_page.stat is StatType.TEXT:
goal_keys = ('task_goal', 'message_goal')
elif self._stat_page.stat is StatType.ANKI:
goal_keys = ('task_goal', 'card_goal')
goals = await goal_model.fetch_or_create(*key.values())
tasks = await tasks_model.fetch_where(**key)
modal = GoalEditor(self._stat_page)
orig_first = goals[goal_keys[0]]
orig_second = goals[goal_keys[1]]
modal.first_goal.default = str(orig_first) if orig_first is not None else None
modal.second_goal.default = str(orig_second) if orig_second is not None else None
if tasks:
tasklines = [f"[{'x' if task.completed else ' '}] {task.content}" for task in tasks]
modal.task_editor.default = '\n'.join(tasklines)
@modal.submit_callback()
async def parse_goals(interaction: discord.Interaction):
new_first, new_second, new_tasks = await modal.parse()
# Successful parse, ack the interaction
await interaction.response.defer(thinking=True)
modified = False
# Update the numerical goals, using the correct keys
if new_first != orig_first or new_second != orig_second:
modified = True
update_args = dict(zip(goal_keys, (new_first, new_second)))
await goals.update(**update_args)
# Update the tasklist
if len(new_tasks) != len(tasks) or not all(t == new_t for (t, new_t) in zip(tasks, new_tasks)):
modified = True
conn = await self.bot.db.get_connection()
async with conn.transaction():
await tasks_model.table.delete_where(**key)
if new_tasks:
await tasks_model.table.insert_many(
(*key.keys(), 'completed', 'content'),
*((*key.values(), *new_task) for new_task in new_tasks)
)
if modified:
# If either goal type was modified, clear the rendered cache and refresh
for page_key, (goalf, statf) in self._card_cache.items():
# If the stat period type is the same as the current period type
if page_key[2].period is self._stat_page.period:
self._card_cache[page_key] = (None, statf)
await self.refresh(thinking=interaction)
await press.response.send_modal(modal)
async def edit_button_refresh(self):
t = self.bot.translator.t
self.edit_button.disabled = (self._offset != 0)
self.edit_button.label = t(_p(
'ui:weeklymonthly|button:edit_goals|label',
"Edit Goals"
))
def _selector_option_for(self, offset: int, page_type: StatPage) -> SelectOption:
key = (offset, page_type)
if (option := self._selector_cache.get(key, None)) is None:
t = self.bot.translator.t
now = self.lion.now.replace(hour=0, minute=0, second=0, microsecond=0)
# Generate the option
if page_type.period is PeriodType.MONTHLY:
now = now.replace(day=1) # Ensures a valid date after applying month offset
target = apply_month_offset(now, offset)
format = _p(
'ui:weeklymonthly|menu:period|monthly|label',
"{month} {year}"
)
option = SelectOption(
label=t(format).format(
month=local_month(target.month),
year=target.year
),
value=str(offset)
)
else:
start = apply_week_offset(now, offset)
end = start + timedelta(weeks=1)
label_format = _p(
'ui:weeklymonthly|menu:period|weekly|label',
"{year} W{week}"
)
desc_format = _p(
'ui:weeklymonthly|menu:period|weekly|desc',
"{start_day} {start_month} {start_year} to {end_day} {end_month} {end_year}"
)
start_day, end_day = start.day, end.day
start_month, end_month = local_month(start.month, short=True), local_month(end.month, short=True)
start_year, end_year = start.year, end.year
option = SelectOption(
value=str(offset),
label=t(label_format).format(
year=start.year,
week=start.isocalendar().week
),
description=t(desc_format).format(
start_day=start_day, start_month=start_month, start_year=start_year,
end_day=end_day, end_month=end_month, end_year=end_year
)
)
# Add to cache
self._selector_cache[key] = option
return option
async def _fetch_bottom_offset(self, page_type: StatPage) -> int:
"""
Calculate the bottom-most selection offset for the given StatPage.
This is calculated based on the earliest study/text/card session for this user/member.
The result is cached in `self._selector_offset_limit`.
"""
cache_key = (self._showing_global, page_type)
if (result := self._selector_offset_limit.get(cache_key, None)) is None:
# Fetch first session for this page and global mode
data_key = {'userid': self.userid}
if not self._showing_global:
data_key['guildid'] = self.guildid
if page_type.stat is StatType.VOICE:
model = self.data.VoiceSessionStats
first_result = await model.table.select_one_where(**data_key).order_by('start_time')
if first_result is None:
result = 0
else:
now = self.lion.now
tz = self.lion.timezone
start = first_result['start_time'].astimezone(tz)
if page_type.period is PeriodType.WEEKLY:
result = week_difference(start, now)
else:
result = month_difference(start, now)
self._selector_offset_limit[cache_key] = result
return result
@button(label="Select Period", style=ButtonStyle.blurple)
async def select_button(self, press: discord.Interaction, pressed: Button):
"""
Press to open the period selector for this stat type.
"""
await press.response.defer(thinking=True, ephemeral=True)
self._showing_selector = not self._showing_selector
await self.refresh(thinking=press)
async def select_button_refresh(self):
t = self.bot.translator.t
button = self.select_button
if self._showing_selector:
button.label = t(_p(
'ui:weeklymonthly|button:period|close|label',
"Close Selector"
))
elif self.weekly:
button.label = t(_p(
'ui:weeklymonthly|button:period|weekly|label',
"Select Week"
))
else:
button.label = t(_p(
'ui:weeklymonthly|button:period|monthly|label',
"Select Month"
))
@select(placeholder='...', max_values=1)
async def period_menu(self, selection: discord.Interaction, menu: Select):
if menu.values:
await selection.response.defer(thinking=True)
result = int(menu.values[0])
if result == -1:
# More recent
# Change the selector offset for this selector key
current_start = self._selector_offset[self._stat_page]
new_start = current_start - 23 if current_start != 24 else 0
self._selector_offset[self._stat_page] = new_start
elif result == -2:
# More older
# Increase the selector offset for this selector key
current_start = self._selector_offset[self._stat_page]
new_start = current_start + 23 if current_start != 0 else 24
self._selector_offset[self._stat_page] = new_start
else:
# Set the page offset for this period type and refresh
self._offset = result
await self.refresh(thinking=selection)
else:
await selection.response.defer()
async def period_menu_refresh(self):
t = self.bot.translator.t
menu = self.period_menu
options = []
starting = self._selector_offset[self._stat_page] # Offset of first entry to display
if starting > 0:
# start with More ... (prev)
more_first = SelectOption(
value="-1",
label="More (More recent)"
)
options.append(more_first)
bottom = await self._fetch_bottom_offset(self._stat_page)
if bottom - starting + 1 + len(options) <= 24:
# Put all remaining entries into the options lise
for offset in range(starting, bottom + 1):
option = self._selector_option_for(offset, self._stat_page)
option.default = (offset == self._offset)
options.append(option)
else:
# Put the next 23 or 24 options there, and cap with a more option
for offset in range(starting, starting + 24 - len(options)):
option = self._selector_option_for(offset, self._stat_page)
option.default = (offset == self._offset)
options.append(option)
more_last = SelectOption(
value='-2',
label="More (Older)"
)
options.append(more_last)
menu.options = options
if self.weekly:
menu.placeholder = t(_p(
'ui:weeklymonthly|menu:period|weekly|placeholder',
"Select a week to display"
))
else:
menu.placeholder = t(_p(
'ui:weeklymonthly|menu:period|monthly|placeholder',
"Select a month to display"
))
@button(label="Global Stats", style=ButtonStyle.blurple)
async def global_button(self, press: discord.Interaction, pressed: Button):
"""
Switch between local and global statistics modes.
"""
await press.response.defer(thinking=True, ephemeral=True)
self._showing_global = not self._showing_global
# TODO: Asynchronously update user preferences
await self.refresh(thinking=press if not self._showing_global else None)
if self._showing_global:
t = self.bot.translator.t
embed = discord.Embed(
colour=discord.Colour.orange(),
description=t(_p(
'ui:WeeklyMonthly|button:global|resp:success',
"You will now see statistics from all you servers (where applicable)! Press again to revert."
))
)
await press.edit_original_response(embed=embed)
async def global_button_refresh(self):
button = self.global_button
if self._showing_global:
button.label = "Server Statistics"
else:
button.label = "Global Statistics"
async def refresh_components(self):
await asyncio.gather(
self.edit_button_refresh(),
self.select_button_refresh(),
self.global_button_refresh(),
self.close_button_refresh(),
self.type_menu_refresh(),
self.select_button_refresh(),
)
# TODO: Lazy refresh
self._layout = [
(self.type_menu,),
(self.edit_button, self.select_button, self.global_button, self.close_button)
]
if self._showing_selector:
await self.period_menu_refresh()
self._layout.append((self.period_menu,))
async def _tmp_fetch_goals(self):
data = self.data
now = self.lion.now
if self.weekly:
goal_model = data.WeeklyGoals
tasks_model = data.WeeklyTasks
weekid = extract_weekid(apply_week_offset(now, self._week_offset))
key = {'guildid': self.guildid, 'userid': self.userid, 'weekid': weekid}
else:
goal_model = data.MonthlyGoals
tasks_model = data.MonthlyTasks
now = now.replace(day=1) # Ensures a valid date after applying month offset
monthid = extract_monthid(apply_month_offset(now, self._month_offset))
key = {'guildid': self.guildid, 'userid': self.userid, 'monthid': monthid}
if self._stat_page.stat is StatType.VOICE:
goal_keys = ('task_goal', 'study_goal')
elif self._stat_page.stat is StatType.TEXT:
goal_keys = ('task_goal', 'message_goal')
elif self._stat_page.stat is StatType.ANKI:
goal_keys = ('task_goal', 'card_goal')
goals = await goal_model.fetch_or_create(*key.values())
tasks = await tasks_model.fetch_where(**key)
numbers = (goals.task_goal, goals.study_goal)
tasklist = [
(i, task.content, task.completed)
for i, task in enumerate(tasks)
]
return numbers, tasklist
async def _render_goals(self, show_global, offset, stat_page):
if stat_page.stat is StatType.VOICE:
mode = CardMode.VOICE
elif stat_page.stat is StatType.TEXT:
mode = CardMode.TEXT
elif stat_page.stats is StatType.ANKI:
mode = CardMode.ANKI
card = await get_goals_card(
self.bot,
self.userid,
self.guildid or 0,
offset,
(self._stat_page.period is PeriodType.WEEKLY),
mode
)
await card.render()
return card
async def _render_stats(self, show_global, offset, stat_page):
if stat_page.stat is StatType.VOICE:
mode = CardMode.VOICE
elif stat_page.stat is StatType.TEXT:
mode = CardMode.TEXT
elif stat_page.stats is StatType.ANKI:
mode = CardMode.ANKI
if stat_page.period == PeriodType.WEEKLY:
card = await get_weekly_card(
self.bot,
self.userid,
self.guildid,
offset,
mode
)
else:
card = await get_monthly_card(
self.bot,
self.userid,
self.guildid,
offset,
mode
)
await card.render()
return card
def _prepare(self, *key):
"""
Launch render tasks for the given offset and stat_type.
Avoids re-rendering if completed or already in progress.
"""
goal_task, stats_task = self._card_cache.get(key, (None, None))
if goal_task is None or goal_task.cancelled():
goal_task = asyncio.create_task(self._render_goals(*key))
if stats_task is None or stats_task.cancelled():
stats_task = asyncio.create_task(self._render_stats(*key))
tasks = (goal_task, stats_task)
self._card_cache[key] = tasks
return tasks
async def fetch_cards(self, *key):
"""
Render the cards for the current offset and stat type.
Avoids re-rendering.
Will raise asyncio.CancelledError if a rendering task is cancelled.
"""
tasks = self._prepare(*key)
await asyncio.gather(*tasks)
return (tasks[0].result(), tasks[1].result())
async def reload(self):
"""
Reload the UI data, applying cache where possible.
"""
await self.fetch_cards(*self.key)
async def make_message(self) -> MessageArgs:
goal_card, stats_card = await self.fetch_cards(*self.key)
files = [
goal_card.as_file('goals.png'),
stats_card.as_file('stats.png')
]
return MessageArgs(files=files)
async def run(self, interaction: discord.Interaction):
"""
Execute the UI using the given interaction.
"""
self._original = interaction
self._showing_global = False
self.lion = await self.bot.core.lions.fetch(self.guildid, self.userid)
# TODO: Switch to using data cache in reload to calculate global/local
await self.refresh()