Compare commits
1 Commits
feat-simpl
...
feat-count
| Author | SHA1 | Date | |
|---|---|---|---|
| 7a2ad77b91 |
2
src/gui
2
src/gui
Submodule src/gui updated: 62d2484914...40bc140355
@@ -32,7 +32,6 @@ active_discord = [
|
||||
'.shoutouts',
|
||||
'.tagstrings',
|
||||
'.voiceroles',
|
||||
'.hyperfocus',
|
||||
]
|
||||
|
||||
async def setup(bot):
|
||||
|
||||
@@ -15,10 +15,9 @@ from data.queries import ORDER
|
||||
from meta import LionCog, LionBot, CrocBot, LionContext
|
||||
from modules.profiles.community import Community
|
||||
from modules.profiles.profile import UserProfile
|
||||
from utils.lib import utc_now, paginate_list, pager
|
||||
from utils.lib import utc_now
|
||||
from . import logger
|
||||
from .data import CounterData
|
||||
from .graphics.weekly import counter_weekly_card, counter_monthly_card
|
||||
|
||||
|
||||
class PERIOD(Enum):
|
||||
@@ -93,7 +92,7 @@ def counter_cmd_factory(
|
||||
community: Community,
|
||||
args: Optional[str]
|
||||
):
|
||||
await cog.show_lb(ctx, counter, args, author, community, origin)
|
||||
await ctx.reply(await cog.formatted_lb(counter, args, community, origin))
|
||||
|
||||
async def undo_cmd(
|
||||
cog,
|
||||
@@ -135,10 +134,9 @@ class CounterCog(LionCog):
|
||||
|
||||
async def cog_load(self):
|
||||
self._load_twitch_methods(self.crocbot)
|
||||
await self.load_counter_commands()
|
||||
|
||||
await self.data.init()
|
||||
|
||||
await self.load_counter_commands()
|
||||
await self.load_counters()
|
||||
self.loaded.set()
|
||||
|
||||
@@ -196,6 +194,7 @@ class CounterCog(LionCog):
|
||||
self.add_twitch_command(self.crocbot, cmd)
|
||||
for cmd in disc_cmds:
|
||||
# cmd.cog = self
|
||||
self.bot.remove_command(cmd.name)
|
||||
self.bot.add_command(cmd)
|
||||
print(f"Adding command: {cmd}")
|
||||
|
||||
@@ -403,7 +402,8 @@ class CounterCog(LionCog):
|
||||
total = await self.totals(name)
|
||||
await ctx.reply(f"'{name}' counter is now: {total}")
|
||||
elif subcmd == 'lb':
|
||||
await self.show_lb(ctx, name, args or '', author, community, origin=ORIGIN.TWITCH)
|
||||
lbstr = await self.formatted_lb(name, args or '', community)
|
||||
await ctx.reply(lbstr)
|
||||
elif subcmd == 'clear':
|
||||
await self.reset_counter(name)
|
||||
await ctx.reply(f"'{name}' counter reset.")
|
||||
@@ -484,71 +484,24 @@ class CounterCog(LionCog):
|
||||
|
||||
return (period, start_time)
|
||||
|
||||
@cmds.hybrid_command(
|
||||
name='counterlb',
|
||||
description="Show the leaderboard for the given counter."
|
||||
)
|
||||
async def counterlb_dcmd(self, ctx: LionContext, counter: str, period: Optional[str] = None):
|
||||
profiles = self.bot.get_cog('ProfileCog')
|
||||
author = await profiles.fetch_profile_discord(ctx.author)
|
||||
community = await profiles.fetch_community_discord(ctx.guild)
|
||||
await self.show_lb(ctx, counter, period, author, community, ORIGIN.DISCORD)
|
||||
|
||||
@cmds.hybrid_command(
|
||||
name='counterstats',
|
||||
description="Show your stats for the given counter."
|
||||
)
|
||||
async def counterstats_dcmd(self, ctx: LionContext, counter: str, period: Optional[str]=None):
|
||||
profiles = self.bot.get_cog('ProfileCog')
|
||||
author = await profiles.fetch_profile_discord(ctx.author)
|
||||
community = await profiles.fetch_community_discord(ctx.guild)
|
||||
|
||||
if period and period.lower() in ('monthly', 'month'):
|
||||
card = await counter_monthly_card(
|
||||
self.bot,
|
||||
userid=ctx.author.id,
|
||||
profile=author,
|
||||
counter=await self.fetch_counter(counter),
|
||||
guildid=ctx.guild.id,
|
||||
offset=0,
|
||||
)
|
||||
await card.render()
|
||||
await ctx.reply(file=card.as_file('stats.png'))
|
||||
else:
|
||||
card = await counter_weekly_card(
|
||||
self.bot,
|
||||
userid=ctx.author.id,
|
||||
profile=author,
|
||||
counter=await self.fetch_counter(counter),
|
||||
guildid=ctx.guild.id,
|
||||
offset=0,
|
||||
)
|
||||
await card.render()
|
||||
await ctx.reply(file=card.as_file('stats.png'))
|
||||
|
||||
async def show_lb(
|
||||
async def formatted_lb(
|
||||
self,
|
||||
ctx: commands.Context | LionContext,
|
||||
counter: str,
|
||||
periodstr: str,
|
||||
caller: UserProfile,
|
||||
community: Community,
|
||||
origin: ORIGIN = ORIGIN.TWITCH
|
||||
):
|
||||
|
||||
period, start_time = await self.parse_period(community, periodstr)
|
||||
lb = await self.leaderboard(counter, start_time=start_time)
|
||||
name_map = {}
|
||||
for userid in lb.keys():
|
||||
profile = await UserProfile.fetch(self.bot, userid)
|
||||
name = await profile.get_name()
|
||||
name_map[userid] = name
|
||||
|
||||
if not lb:
|
||||
await ctx.reply(
|
||||
f"{counter} {period.value[-1]} leaderboard is empty!"
|
||||
)
|
||||
elif origin is ORIGIN.TWITCH:
|
||||
lb = await self.leaderboard(counter, start_time=start_time)
|
||||
if lb:
|
||||
name_map = {}
|
||||
for userid in lb.keys():
|
||||
profile = await UserProfile.fetch(self.bot, userid)
|
||||
name = await profile.get_name()
|
||||
name_map[userid] = name
|
||||
|
||||
parts = []
|
||||
items = list(lb.items())
|
||||
prefix = 'top 10 ' if len(items) > 10 else ''
|
||||
@@ -558,30 +511,6 @@ class CounterCog(LionCog):
|
||||
part = f"{name}: {total}"
|
||||
parts.append(part)
|
||||
lbstr = '; '.join(parts)
|
||||
await ctx.reply(f"{counter} {period.value[-1]} {prefix}leaderboard --- {lbstr}")
|
||||
elif origin is ORIGIN.DISCORD:
|
||||
title = f"'{counter}' {period.value[-1]} leaderboard"
|
||||
|
||||
lb_strings = []
|
||||
author_index = None
|
||||
max_name_len = min((30, max(len(name) for name in name_map.values())))
|
||||
for i, (uid, total) in enumerate(lb.items()):
|
||||
if author_index is None and uid == caller.profileid:
|
||||
author_index = i
|
||||
lb_strings.append(
|
||||
"{:<{}}\t{:<9}".format(
|
||||
name_map[uid],
|
||||
max_name_len,
|
||||
total,
|
||||
)
|
||||
)
|
||||
|
||||
page_len = 20
|
||||
pages = paginate_list(lb_strings, block_length=page_len, title=title)
|
||||
start_page = author_index // page_len if author_index is not None else 0
|
||||
|
||||
await pager(
|
||||
ctx,
|
||||
pages,
|
||||
start_at=start_page
|
||||
)
|
||||
return f"{counter} {period.value[-1]} {prefix}leaderboard --- {lbstr}"
|
||||
else:
|
||||
return f"{counter} {period.value[-1]} leaderboard is empty!"
|
||||
|
||||
@@ -1,222 +0,0 @@
|
||||
import itertools
|
||||
from typing import Optional
|
||||
from datetime import timedelta, datetime
|
||||
import calendar
|
||||
|
||||
from meta import LionBot
|
||||
from gui.cards import WeeklyStatsCard, MonthlyStatsCard
|
||||
from gui.base import CardMode
|
||||
from modules.profiles.profile import UserProfile
|
||||
from babel import LocalBabel
|
||||
from modules.statistics.lib import apply_month_offset
|
||||
|
||||
from ..data import CounterData
|
||||
|
||||
babel = LocalBabel('counters')
|
||||
_ = babel._
|
||||
|
||||
|
||||
|
||||
async def counter_monthly_card(
|
||||
bot: LionBot,
|
||||
userid: int,
|
||||
profile: UserProfile,
|
||||
counter: CounterData.Counter,
|
||||
guildid: int,
|
||||
offset: int,
|
||||
):
|
||||
cog = bot.get_cog('CounterCog')
|
||||
data: CounterData = cog.data
|
||||
|
||||
if guildid:
|
||||
lion = await bot.core.lions.fetch_member(guildid, userid)
|
||||
user = await lion.fetch_member()
|
||||
else:
|
||||
lion = await bot.core.lions.fetch_user(userid)
|
||||
user = await bot.fetch_user(userid)
|
||||
today = lion.today
|
||||
|
||||
month_start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
|
||||
target = apply_month_offset(month_start, offset)
|
||||
target_end = (target + timedelta(days=40)).replace(day=1, hour=0, minute=0) - timedelta(days=1)
|
||||
|
||||
months = [target]
|
||||
for i in range(0, 3):
|
||||
months.append((months[-1] - timedelta(days=1)).replace(day=1))
|
||||
months.reverse()
|
||||
|
||||
rows = await data.CounterEntry.fetch_where(
|
||||
data.CounterEntry.counterid == counter.counterid,
|
||||
data.CounterEntry.userid == profile.profileid,
|
||||
data.CounterEntry.created_at <= target_end,
|
||||
data.CounterEntry.created_at >= months[0],
|
||||
)
|
||||
|
||||
events = [(row.created_at, row.value) for row in rows]
|
||||
|
||||
month_lengths = [
|
||||
(calendar.monthrange(month.year, month.month)[1]) for month in months
|
||||
]
|
||||
month_dates = []
|
||||
for month, length in zip(months, month_lengths):
|
||||
for day in range(1, length + 1):
|
||||
month_dates.append(datetime(month.year, month.month, day, tzinfo=month.tzinfo))
|
||||
|
||||
monthly_flat = events_to_dayfreq(events, month_dates)
|
||||
print(monthly_flat)
|
||||
|
||||
monthly = []
|
||||
i = 0
|
||||
for length in month_lengths:
|
||||
this_month = monthly_flat[i : i+length]
|
||||
i += length
|
||||
monthly.append(this_month)
|
||||
|
||||
|
||||
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
|
||||
guildid, userid, MonthlyStatsCard.card_id
|
||||
)
|
||||
skin |= {
|
||||
'title_text': f"{counter.name.upper()}",
|
||||
'this_month_text': f"THIS MONTH: {{amount}} {counter.name.upper()}",
|
||||
'last_month_text': f"LAST MONTH: {{amount}} {counter.name.upper()}"
|
||||
}
|
||||
|
||||
if user:
|
||||
username = (user.display_name, '')
|
||||
else:
|
||||
username = (await profile.get_name(), '')
|
||||
|
||||
|
||||
card = MonthlyStatsCard(
|
||||
user=username,
|
||||
timezone=str(lion.timezone),
|
||||
now=lion.now.timestamp(),
|
||||
month=int(target.timestamp()),
|
||||
monthly=monthly,
|
||||
current_streak=-1,
|
||||
longest_streak=-1,
|
||||
skin=skin | {'mode': CardMode.TEXT}
|
||||
)
|
||||
return card
|
||||
|
||||
|
||||
|
||||
|
||||
async def counter_weekly_card(
|
||||
bot: LionBot,
|
||||
userid: int,
|
||||
profile: UserProfile,
|
||||
counter: CounterData.Counter,
|
||||
guildid: int,
|
||||
offset: int,
|
||||
):
|
||||
cog = bot.get_cog('CounterCog')
|
||||
data: CounterData = cog.data
|
||||
|
||||
if guildid:
|
||||
lion = await bot.core.lions.fetch_member(guildid, userid)
|
||||
user = await lion.fetch_member()
|
||||
else:
|
||||
lion = await bot.core.lions.fetch_user(userid)
|
||||
user = await bot.fetch_user(userid)
|
||||
today = lion.today
|
||||
week_start = today - timedelta(days=today.weekday()) - timedelta(weeks=offset)
|
||||
days = [week_start + timedelta(i) for i in range(-7, 8 if offset else (today.weekday() + 2))]
|
||||
|
||||
rows = await data.CounterEntry.fetch_where(
|
||||
data.CounterEntry.counterid == counter.counterid,
|
||||
data.CounterEntry.userid == profile.profileid,
|
||||
data.CounterEntry.created_at <= days[-1],
|
||||
data.CounterEntry.created_at >= days[0],
|
||||
)
|
||||
|
||||
events = [(row.created_at, row.value) for row in rows]
|
||||
|
||||
daily = events_to_dayfreq(events, days)
|
||||
sessions = events_to_sessions(next(zip(*events), []))
|
||||
|
||||
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
|
||||
guildid, userid, WeeklyStatsCard.card_id
|
||||
)
|
||||
skin |= {
|
||||
'title_text': f"{counter.name.upper()}",
|
||||
'this_week_text': f"THIS WEEK: {{amount}} {counter.name.upper()}",
|
||||
'last_week_text': f"LAST WEEK: {{amount}} {counter.name.upper()}"
|
||||
}
|
||||
|
||||
if user:
|
||||
username = (user.display_name, '')
|
||||
else:
|
||||
username = (await profile.get_name(), '')
|
||||
|
||||
|
||||
card = WeeklyStatsCard(
|
||||
user=username,
|
||||
timezone=str(lion.timezone),
|
||||
now=lion.now.timestamp(),
|
||||
week=week_start.timestamp(),
|
||||
daily=tuple(map(int, daily)),
|
||||
sessions=sessions,
|
||||
skin=skin | {'mode': CardMode.TEXT}
|
||||
)
|
||||
return card
|
||||
|
||||
|
||||
|
||||
def events_to_dayfreq(events: list[tuple[datetime, int]], days: list[datetime]) -> list[int]:
|
||||
if not days:
|
||||
return []
|
||||
|
||||
last_day = 0
|
||||
dayts = 0
|
||||
|
||||
daymap = {}
|
||||
for day in sorted(days, reverse=True):
|
||||
dayts = day.timestamp()
|
||||
last_day = last_day or (day + timedelta(days=1)).timestamp()
|
||||
daymap[dayts] = 0
|
||||
|
||||
first_day = dayts
|
||||
|
||||
for tim, count in events:
|
||||
timts = tim.timestamp()
|
||||
if not first_day < timts < last_day:
|
||||
continue
|
||||
|
||||
for day_start in daymap:
|
||||
if timts > day_start:
|
||||
daymap[day_start] += count
|
||||
break
|
||||
|
||||
return list(reversed(daymap.values()))
|
||||
|
||||
|
||||
def events_to_sessions(event_times: list[datetime]) -> list[tuple[int, int]]:
|
||||
"""
|
||||
Convert a provided list of event times to a session list.
|
||||
"""
|
||||
sessions = []
|
||||
|
||||
session_start = None
|
||||
session_end = None
|
||||
|
||||
SESSION_GAP = 60 * 30
|
||||
SESSION_RADIUS = 60 * 30
|
||||
|
||||
for time in sorted(event_times):
|
||||
if session_start and session_end and (time - session_end).total_seconds() - SESSION_RADIUS > SESSION_GAP:
|
||||
session = (int(session_start.timestamp()), int(session_end.timestamp()))
|
||||
sessions.append(session)
|
||||
session_start = None
|
||||
session_end = None
|
||||
|
||||
if session_start is None:
|
||||
session_start = time - timedelta(seconds=SESSION_RADIUS)
|
||||
session_end = time + timedelta(seconds=SESSION_RADIUS)
|
||||
|
||||
if session_start and session_end:
|
||||
session = (int(session_start.timestamp()), int(session_end.timestamp()))
|
||||
sessions.append(session)
|
||||
|
||||
return sessions
|
||||
@@ -1,8 +0,0 @@
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from .cog import HyperFocusCog
|
||||
|
||||
async def setup(bot):
|
||||
await bot.add_cog(HyperFocusCog(bot))
|
||||
@@ -1,306 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
import twitchio
|
||||
from twitchio.ext import commands
|
||||
from twitchAPI.type import AuthScope
|
||||
import random
|
||||
import datetime as dt
|
||||
from datetime import timedelta, datetime
|
||||
|
||||
from meta import CrocBot, LionCog, LionContext, LionBot
|
||||
from meta.sockets import Channel, register_channel
|
||||
from utils.lib import strfdelta, utc_now
|
||||
from . import logger
|
||||
|
||||
|
||||
@dataclass
|
||||
class FocusState:
|
||||
userid: int | str
|
||||
name: str
|
||||
focus_ends: datetime
|
||||
hyper: bool = True
|
||||
|
||||
|
||||
class FocusChannel(Channel):
|
||||
name = 'FocusList'
|
||||
|
||||
def __init__(self, cog: 'HyperFocusCog', **kwargs):
|
||||
self.cog = cog
|
||||
super().__init__(**kwargs)
|
||||
|
||||
async def on_connection(self, websocket, event):
|
||||
await super().on_connection(websocket, event)
|
||||
await self.reload_focus(websocket=websocket)
|
||||
|
||||
def focus_args(self, state: FocusState):
|
||||
return (
|
||||
state.userid,
|
||||
state.name,
|
||||
state.hyper,
|
||||
state.focus_ends.isoformat(),
|
||||
)
|
||||
|
||||
async def reload_focus(self, websocket=None):
|
||||
"""
|
||||
Clear tasklist and re-send current tasks.
|
||||
"""
|
||||
await self.send_clear(websocket=websocket)
|
||||
for state in self.cog.hyperfocusing.values():
|
||||
await self.send_set(*self.focus_args(state), websocket=websocket)
|
||||
|
||||
async def send_set(self, userid, name, hyper, end_at, websocket=None):
|
||||
await self.send_event({
|
||||
'type': "DO",
|
||||
'method': "setFocus",
|
||||
'args': {
|
||||
'userid': userid,
|
||||
'name': name,
|
||||
'hyper': hyper,
|
||||
'end_at': end_at,
|
||||
}
|
||||
}, websocket=websocket)
|
||||
|
||||
async def send_del(self, userid, websocket=None):
|
||||
await self.send_event({
|
||||
'type': "DO",
|
||||
'method': "delFocus",
|
||||
'args': {
|
||||
'userid': userid,
|
||||
}
|
||||
}, websocket=websocket)
|
||||
|
||||
async def send_clear(self, websocket=None):
|
||||
await self.send_event({
|
||||
'type': "DO",
|
||||
'method': "clearFocus",
|
||||
'args': {
|
||||
}
|
||||
}, websocket=websocket)
|
||||
|
||||
|
||||
|
||||
class HyperFocusCog(LionCog):
|
||||
def __init__(self, bot: CrocBot):
|
||||
self.bot = bot
|
||||
self.crocbot: CrocBot = bot.crocbot
|
||||
|
||||
# userid -> timestamp when they stop
|
||||
self.hyperfocusing: dict[str, FocusState] = {}
|
||||
|
||||
self.channel = FocusChannel(self)
|
||||
register_channel(self.channel.name, self.channel)
|
||||
|
||||
self.loaded = asyncio.Event()
|
||||
|
||||
async def cog_load(self):
|
||||
self._load_twitch_methods(self.crocbot)
|
||||
self.load_hyperfocus()
|
||||
self.loaded.set()
|
||||
|
||||
async def cog_unload(self):
|
||||
self._unload_twitch_methods(self.crocbot)
|
||||
|
||||
def save_hyperfocus(self):
|
||||
with open('hyperfocus.json', 'w', encoding='utf-8') as f:
|
||||
mapped = {
|
||||
userid: {
|
||||
'userid': str(state.userid),
|
||||
'name': state.name,
|
||||
'focus_ends': state.focus_ends.isoformat(),
|
||||
'hyper': state.hyper
|
||||
}
|
||||
for userid, state in self.hyperfocusing.items()
|
||||
}
|
||||
json.dump(mapped, f, ensure_ascii=False, indent=4)
|
||||
|
||||
def load_hyperfocus(self):
|
||||
with open('hyperfocus.json') as f:
|
||||
mapped = json.load(f)
|
||||
self.hyperfocusing.clear()
|
||||
for userid, map in mapped.items():
|
||||
self.hyperfocusing[str(userid)] = FocusState(
|
||||
userid=str(map['userid']),
|
||||
name=map['name'],
|
||||
hyper=map['hyper'],
|
||||
focus_ends=dt.datetime.fromisoformat(map['focus_ends'])
|
||||
)
|
||||
print(f"Loaded hyperfocus: {self.hyperfocusing}")
|
||||
|
||||
def check_hyperfocus(self, userid):
|
||||
"""
|
||||
Returns whether a user is currently in HYPERFOCUS mode!
|
||||
"""
|
||||
return (state := self.hyperfocusing.get(userid, None)) and utc_now() < state.focus_ends
|
||||
|
||||
@commands.Cog.event('event_message')
|
||||
async def on_message(self, message: twitchio.Message):
|
||||
if message.content and message.content.lower() == 'nice':
|
||||
await message.channel.send("That's Nice")
|
||||
|
||||
await self.good_croccy_handler(message)
|
||||
|
||||
tags = message.tags
|
||||
if tags and message.content and self.check_hyperfocus(tags.get('user-id')):
|
||||
if not self.valid_focus_message(message):
|
||||
logger.info(
|
||||
f"Deleting message from hyperfocused user. {message.raw_data=}"
|
||||
)
|
||||
await asyncio.sleep(1)
|
||||
msgid = tags['id']
|
||||
# TODO: Better selection for moderator
|
||||
# i.e. if the message is not from the broadcaster and we do have delete perms
|
||||
# then use our own token.
|
||||
broadcasterid = tags['room-id']
|
||||
authcog = self.bot.get_cog('TwitchAuthCog')
|
||||
if not await authcog.check_auth(broadcasterid, scopes=[AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES]):
|
||||
await message.channel.send(f"@{message.author.name} Stay focused! (I tried to delete your message because you are in !hyperfocus. Unfortunately I don't have the permissions to do that. But stay focused anyway!)")
|
||||
else:
|
||||
twitch = await authcog.fetch_client_for(broadcasterid)
|
||||
await twitch.delete_chat_message(
|
||||
broadcasterid,
|
||||
broadcasterid,
|
||||
msgid,
|
||||
)
|
||||
await message.channel.send(
|
||||
f"@{message.author.name} Stay focused! (I deleted your message because you are in !hyperfocus, use !unfocus to come back.)"
|
||||
)
|
||||
|
||||
async def good_croccy_handler(self, message: twitchio.Message):
|
||||
if not message.content:
|
||||
return
|
||||
cleaned = message.content.lower().replace('@croccyhelper', '').strip()
|
||||
if cleaned in ('good croc', 'good croccy', 'good helper'):
|
||||
await message.channel.send("holono1Heart")
|
||||
elif cleaned in ('bad croc', 'bad croccy', 'bad helper'):
|
||||
await message.channel.send("holono1Sad")
|
||||
|
||||
async def chemical_handler(self, message: twitchio.Message):
|
||||
if not message.content:
|
||||
return
|
||||
cleaned = message.content.lower().strip()
|
||||
if cleaned in ('oh',):
|
||||
await message.channel.send('Oxygen Hydrogen!')
|
||||
|
||||
def valid_focus_message(self, message: twitchio.Message) -> bool:
|
||||
"""
|
||||
Determined whether the given message is allowed to be sent in !hyperfocus.
|
||||
That is, if it appears to be emote-only or a command.
|
||||
"""
|
||||
|
||||
content = message.content
|
||||
if not content:
|
||||
return True
|
||||
|
||||
tags = message.tags or {}
|
||||
to_remove = []
|
||||
|
||||
if (replying := tags.get('reply-parent-user-login', '')) and content.startswith('@'):
|
||||
# Trim the mention from the start of the content
|
||||
splits = content.split(maxsplit=1)
|
||||
to_remove.append((0, len(splits[0])))
|
||||
|
||||
if emotesstr := tags.get('emotes', ''):
|
||||
for emotestr in emotesstr.split('/'):
|
||||
emote, locs = emotestr.split(':')
|
||||
for loc in locs.split(','):
|
||||
start, end = loc.split('-')
|
||||
to_remove.append((int(start), int(end) + 1))
|
||||
|
||||
# Sort the pairs to remove by descending starting index
|
||||
# This should allow clean removal with a loop as long as there are no intersections.
|
||||
to_remove.sort(key=lambda pair: pair[0], reverse=True)
|
||||
for start, end in to_remove:
|
||||
content = content[:start] + content[end:]
|
||||
content = content.strip().replace(' ', '').replace('\n', '')
|
||||
allowed = not content or content.startswith('!') or content.startswith('*')
|
||||
allowed = allowed or all(not char.isascii() for char in content)
|
||||
|
||||
if not allowed:
|
||||
logger.info(f"Invalid hyperfocus message. Trimmed content: {content}")
|
||||
|
||||
return allowed
|
||||
|
||||
@commands.command(name='coinflip')
|
||||
async def coinflip(self, ctx):
|
||||
await ctx.reply(random.choice(('heads', 'tails')))
|
||||
|
||||
@commands.command(name='choose')
|
||||
async def choose(self, ctx, *, args: str):
|
||||
if not args:
|
||||
await ctx.reply("Give me something to choose, e.g. !choose Heads | Tails")
|
||||
else:
|
||||
options = args.split('|')
|
||||
options = [option.strip() for option in options]
|
||||
options = [option for option in options if option]
|
||||
choice = random.choice(options)
|
||||
if random.random() < 0.01:
|
||||
choice = "You"
|
||||
await ctx.reply(f"I choose: {choice}")
|
||||
|
||||
@commands.command(name='hyperfocus')
|
||||
async def hyperfocus_cmd(self, ctx, dur: Optional[int] = None):
|
||||
userid = str(ctx.author.id)
|
||||
now = utc_now()
|
||||
end_time = None
|
||||
|
||||
if dur is None:
|
||||
# Automatically select time
|
||||
next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta(hours=1)
|
||||
next_block = next_hour - dt.timedelta(minutes=10)
|
||||
if now > next_block:
|
||||
# Currently in the break
|
||||
next_block = next_block + dt.timedelta(hours=1)
|
||||
end_time = next_block
|
||||
dur = int((end_time - now).total_seconds() // 60)
|
||||
elif dur > 720:
|
||||
await ctx.reply("You can hyperfocus for at most 12 hours at a time!")
|
||||
else:
|
||||
end_time = utc_now() + dt.timedelta(minutes=dur)
|
||||
|
||||
if end_time is not None:
|
||||
state = self.hyperfocusing[userid] = FocusState(
|
||||
userid=userid,
|
||||
name=ctx.author.display_name,
|
||||
focus_ends=end_time,
|
||||
)
|
||||
self.save_hyperfocus()
|
||||
await self.channel.send_set(*self.channel.focus_args(state))
|
||||
await ctx.reply(
|
||||
f"{ctx.author.name} has gone into HYPERFOCUS mode! "
|
||||
f"They will be in emote and command only mode for the next {dur} minutes! "
|
||||
"Use !unfocus if you really need to chat before then, best of luck! 🍀"
|
||||
)
|
||||
|
||||
@commands.command(name='unfocus')
|
||||
async def unfocus_cmd(self, ctx):
|
||||
self.hyperfocusing.pop(ctx.author.id, None)
|
||||
self.save_hyperfocus()
|
||||
await self.channel.send_del(ctx.author.id)
|
||||
await ctx.reply("Welcome back from focus, hope it went well! Have a comfy break and remember to have a sippie and a stretch~")
|
||||
|
||||
@commands.command(name='hyperfocused')
|
||||
async def focused_cmd(self, ctx, user: Optional[twitchio.User] = None):
|
||||
user = user if user is not None else ctx.author
|
||||
userid = str(user.id)
|
||||
if self.check_hyperfocus(userid):
|
||||
state = self.hyperfocusing.get(userid)
|
||||
end_time = state.focus_ends
|
||||
durstr = strfdelta(end_time - utc_now())
|
||||
await ctx.reply(
|
||||
f"{user.name} is in HYPERFOCUS for another {durstr}! "
|
||||
"They can only write emojis and commands in this time~ "
|
||||
"(use !unfocus to come back if you need to!) "
|
||||
"Good luck!"
|
||||
)
|
||||
elif userid != str(ctx.author.id):
|
||||
await ctx.reply(
|
||||
f"{user.name} is not hyperfocused!"
|
||||
)
|
||||
else:
|
||||
await ctx.reply(
|
||||
"You are not hyperfocused! "
|
||||
"Enter HYPERFOCUS mode for e.g. 10 minutes by writing !hyperfocus 10"
|
||||
)
|
||||
@@ -38,7 +38,7 @@ class UserProfile:
|
||||
twitches = await self.twitch_accounts()
|
||||
if twitches:
|
||||
users = await self.bot.crocbot.fetch_users(
|
||||
ids=[int(twitches[0].userid)]
|
||||
ids=[int(twitch.userid) for twitch in twitches]
|
||||
)
|
||||
if users:
|
||||
user = users[0]
|
||||
@@ -86,21 +86,13 @@ class UserProfile:
|
||||
"""
|
||||
Fetch the Discord accounts associated to this profile.
|
||||
"""
|
||||
return await self.data.DiscordProfileRow.fetch_where(
|
||||
profileid=self.profileid
|
||||
).order_by(
|
||||
'created_at'
|
||||
)
|
||||
return await self.data.DiscordProfileRow.fetch_where(profileid=self.profileid)
|
||||
|
||||
async def twitch_accounts(self) -> list[ProfileData.TwitchProfileRow]:
|
||||
"""
|
||||
Fetch the Twitch accounts associated to this profile.
|
||||
"""
|
||||
return await self.data.TwitchProfileRow.fetch_where(
|
||||
profileid=self.profileid
|
||||
).order_by(
|
||||
'created_at'
|
||||
)
|
||||
return await self.data.TwitchProfileRow.fetch_where(profileid=self.profileid)
|
||||
|
||||
@classmethod
|
||||
async def fetch(cls, bot: LionBot, profile_id: int) -> Self:
|
||||
|
||||
@@ -8,16 +8,9 @@ async def get_leaderboard_card(
|
||||
bot: LionBot, highlightid: int, guildid: int,
|
||||
mode: CardMode,
|
||||
entry_data: list[tuple[int, int, int]], # userid, position, time
|
||||
name_map: dict[int, str] = {},
|
||||
extra_skin_args = {},
|
||||
):
|
||||
"""
|
||||
Render a leaderboard card with given parameters.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name_map: dict[int, str]
|
||||
Map of userid -> name, used first before cache or fetch.
|
||||
"""
|
||||
guild = bot.get_guild(guildid)
|
||||
if guild is None:
|
||||
@@ -27,12 +20,8 @@ async def get_leaderboard_card(
|
||||
avatars = {}
|
||||
names = {}
|
||||
missing = []
|
||||
|
||||
for userid, _, _ in entry_data:
|
||||
if (name := name_map.get(userid, None)):
|
||||
avatars[userid] = None
|
||||
names[userid] = name
|
||||
elif guild and (member := guild.get_member(userid)):
|
||||
if guild and (member := guild.get_member(userid)):
|
||||
avatars[userid] = member.avatar.key if member.avatar else None
|
||||
names[userid] = member.display_name
|
||||
elif (user := bot.get_user(userid)):
|
||||
@@ -76,7 +65,7 @@ async def get_leaderboard_card(
|
||||
guildid, None, LeaderboardCard.card_id
|
||||
)
|
||||
card = LeaderboardCard(
|
||||
skin=skin | {'mode': mode} | extra_skin_args,
|
||||
skin=skin | {'mode': mode},
|
||||
server_name=guild.name,
|
||||
entries=entries,
|
||||
highlight=highlight
|
||||
|
||||
@@ -29,26 +29,13 @@ class TwitchAuthCog(LionCog):
|
||||
self.bot = bot
|
||||
self.data = bot.db.load_registry(TwitchAuthData())
|
||||
|
||||
self.client_cache = {}
|
||||
|
||||
async def cog_load(self):
|
||||
await self.data.init()
|
||||
|
||||
# ----- Auth API -----
|
||||
|
||||
async def fetch_client_for(self, userid: str):
|
||||
authrow = await self.data.UserAuthRow.fetch(userid)
|
||||
if authrow is None:
|
||||
# TODO: Some user authentication error
|
||||
self.client_cache.pop(userid, None)
|
||||
raise ValueError("Requested user is not authenticated.")
|
||||
if (twitch := self.client_cache.get(userid)) is None:
|
||||
twitch = await Twitch(self.bot.config.twitch['app_id'], self.bot.config.twitch['app_secret'])
|
||||
scopes = await self.data.UserAuthRow.get_scopes_for(userid)
|
||||
authscopes = [AuthScope(scope) for scope in scopes]
|
||||
await twitch.set_user_authentication(authrow.access_token, authscopes, authrow.refresh_token)
|
||||
self.client_cache[userid] = twitch
|
||||
return twitch
|
||||
async def fetch_client_for(self, userid: int):
|
||||
...
|
||||
|
||||
async def check_auth(self, userid: str, scopes: list[AuthScope] = []) -> bool:
|
||||
"""
|
||||
@@ -59,9 +46,7 @@ class TwitchAuthCog(LionCog):
|
||||
if authrow:
|
||||
if scopes:
|
||||
has_scopes = await self.data.UserAuthRow.get_scopes_for(userid)
|
||||
desired = {scope.value for scope in scopes}
|
||||
has_auth = desired.issubset(has_scopes)
|
||||
logger.info(f"Auth check for `{userid}`: Requested scopes {desired}, has scopes {has_scopes}. Passed: {has_auth}")
|
||||
has_auth = set(map(str, scopes)).issubset(has_scopes)
|
||||
else:
|
||||
has_auth = True
|
||||
else:
|
||||
@@ -73,7 +58,6 @@ class TwitchAuthCog(LionCog):
|
||||
Start the user authentication flow for the given userid.
|
||||
Will request the given scopes along with the default ones and any existing scopes.
|
||||
"""
|
||||
self.client_cache.pop(userid, None)
|
||||
existing_strs = await self.data.UserAuthRow.get_scopes_for(userid)
|
||||
existing = map(AuthScope, existing_strs)
|
||||
to_request = set(existing).union(scopes)
|
||||
@@ -98,17 +82,3 @@ class TwitchAuthCog(LionCog):
|
||||
await ctx.reply(flow.auth.return_auth_url())
|
||||
await flow.run()
|
||||
await ctx.reply("Authentication Complete!")
|
||||
|
||||
@cmds.hybrid_command(name='modauth')
|
||||
async def cmd_modauth(self, ctx: LionContext):
|
||||
if ctx.interaction:
|
||||
await ctx.interaction.response.defer(ephemeral=True)
|
||||
scopes = [
|
||||
AuthScope.MODERATOR_READ_FOLLOWERS,
|
||||
AuthScope.CHANNEL_READ_REDEMPTIONS,
|
||||
AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES,
|
||||
]
|
||||
flow = await self.start_auth(scopes=scopes)
|
||||
await ctx.reply(flow.auth.return_auth_url())
|
||||
await flow.run()
|
||||
await ctx.reply("Authentication Complete!")
|
||||
|
||||
@@ -64,7 +64,7 @@ class TwitchAuthData(Registry):
|
||||
"""
|
||||
rows = await TwitchAuthData.user_scopes.select_where(userid=userid)
|
||||
|
||||
return [row['scope'] for row in rows] if rows else []
|
||||
return [row.scope for row in rows] if rows else []
|
||||
|
||||
|
||||
"""
|
||||
|
||||
124
src/utils/lib.py
124
src/utils/lib.py
@@ -7,7 +7,6 @@ import iso8601 # type: ignore
|
||||
import pytz
|
||||
import re
|
||||
import json
|
||||
import asyncio
|
||||
from contextvars import Context
|
||||
|
||||
import discord
|
||||
@@ -919,126 +918,3 @@ def write_records(records: list[dict[str, Any]], stream: StringIO):
|
||||
for record in records:
|
||||
stream.write(','.join(map(str, record.values())))
|
||||
stream.write('\n')
|
||||
|
||||
|
||||
async def pager(ctx, pages, locked=True, start_at=0, add_cancel=False, **kwargs):
|
||||
"""
|
||||
Shows the user each page from the provided list `pages` one at a time,
|
||||
providing reactions to page back and forth between pages.
|
||||
This is done asynchronously, and returns after displaying the first page.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
pages: List(Union(str, discord.Embed))
|
||||
A list of either strings or embeds to display as the pages.
|
||||
locked: bool
|
||||
Whether only the `ctx.author` should be able to use the paging reactions.
|
||||
kwargs: ...
|
||||
Remaining keyword arguments are transparently passed to the reply context method.
|
||||
|
||||
Returns: discord.Message
|
||||
This is the output message, returned for easy deletion.
|
||||
"""
|
||||
cancel_emoji = cross
|
||||
# Handle broken input
|
||||
if len(pages) == 0:
|
||||
raise ValueError("Pager cannot page with no pages!")
|
||||
|
||||
# Post first page. Method depends on whether the page is an embed or not.
|
||||
if isinstance(pages[start_at], discord.Embed):
|
||||
out_msg = await ctx.reply(embed=pages[start_at], **kwargs)
|
||||
else:
|
||||
out_msg = await ctx.reply(pages[start_at], **kwargs)
|
||||
|
||||
# Run the paging loop if required
|
||||
if len(pages) > 1:
|
||||
task = asyncio.create_task(_pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs))
|
||||
# ctx.tasks.append(task)
|
||||
elif add_cancel:
|
||||
await out_msg.add_reaction(cancel_emoji)
|
||||
|
||||
# Return the output message
|
||||
return out_msg
|
||||
|
||||
|
||||
async def _pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs):
|
||||
"""
|
||||
Asynchronous initialiser and loop for the `pager` utility above.
|
||||
"""
|
||||
# Page number
|
||||
page = start_at
|
||||
|
||||
# Add reactions to the output message
|
||||
next_emoji = "▶"
|
||||
prev_emoji = "◀"
|
||||
cancel_emoji = cross
|
||||
|
||||
try:
|
||||
await out_msg.add_reaction(prev_emoji)
|
||||
if add_cancel:
|
||||
await out_msg.add_reaction(cancel_emoji)
|
||||
await out_msg.add_reaction(next_emoji)
|
||||
except discord.Forbidden:
|
||||
# We don't have permission to add paging emojis
|
||||
# Die as gracefully as we can
|
||||
if ctx.guild:
|
||||
perms = ctx.channel.permissions_for(ctx.guild.me)
|
||||
if not perms.add_reactions:
|
||||
await ctx.error_reply(
|
||||
"Cannot page results because I do not have the `add_reactions` permission!"
|
||||
)
|
||||
elif not perms.read_message_history:
|
||||
await ctx.error_reply(
|
||||
"Cannot page results because I do not have the `read_message_history` permission!"
|
||||
)
|
||||
else:
|
||||
await ctx.error_reply(
|
||||
"Cannot page results due to insufficient permissions!"
|
||||
)
|
||||
else:
|
||||
await ctx.error_reply(
|
||||
"Cannot page results!"
|
||||
)
|
||||
return
|
||||
|
||||
# Check function to determine whether a reaction is valid
|
||||
def check(reaction, user):
|
||||
result = reaction.message.id == out_msg.id
|
||||
result = result and str(reaction.emoji) in [next_emoji, prev_emoji]
|
||||
result = result and not (user.id == ctx.bot.user.id)
|
||||
result = result and not (locked and user != ctx.author)
|
||||
return result
|
||||
|
||||
# Begin loop
|
||||
while True:
|
||||
# Wait for a valid reaction, break if we time out
|
||||
try:
|
||||
reaction, user = await ctx.bot.wait_for('reaction_add', check=check, timeout=300)
|
||||
except asyncio.TimeoutError:
|
||||
break
|
||||
|
||||
# Attempt to remove the user's reaction, silently ignore errors
|
||||
asyncio.ensure_future(out_msg.remove_reaction(reaction.emoji, user))
|
||||
|
||||
# Change the page number
|
||||
page += 1 if reaction.emoji == next_emoji else -1
|
||||
page %= len(pages)
|
||||
|
||||
# Edit the message with the new page
|
||||
active_page = pages[page]
|
||||
if isinstance(active_page, discord.Embed):
|
||||
await out_msg.edit(embed=active_page, **kwargs)
|
||||
else:
|
||||
await out_msg.edit(content=active_page, **kwargs)
|
||||
|
||||
# Clean up by removing the reactions
|
||||
try:
|
||||
await out_msg.clear_reactions()
|
||||
except discord.Forbidden:
|
||||
try:
|
||||
await out_msg.remove_reaction(next_emoji, ctx.client.user)
|
||||
await out_msg.remove_reaction(prev_emoji, ctx.client.user)
|
||||
except discord.NotFound:
|
||||
pass
|
||||
except discord.NotFound:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user