15 Commits

19 changed files with 1219 additions and 101 deletions

Submodule src/gui updated: 40bc140355...62d2484914

View File

@@ -32,6 +32,7 @@ active_discord = [
'.shoutouts',
'.tagstrings',
'.voiceroles',
'.hyperfocus',
]
async def setup(bot):

View File

@@ -5,15 +5,20 @@ from datetime import timedelta
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
import twitchio
from twitchio.ext import commands
from data.queries import ORDER
from meta import LionCog, LionBot, CrocBot
from utils.lib import utc_now
from meta import LionCog, LionBot, CrocBot, LionContext
from modules.profiles.community import Community
from modules.profiles.profile import UserProfile
from utils.lib import utc_now, paginate_list, pager
from . import logger
from .data import CounterData
from .graphics.weekly import counter_weekly_card, counter_monthly_card
class PERIOD(Enum):
@@ -25,6 +30,11 @@ class PERIOD(Enum):
YEAR = ('this year', 'y', 'year', 'yearly')
class ORIGIN(Enum):
DISCORD = 'discord'
TWITCH = 'twitch'
def counter_cmd_factory(
counter: str,
response: str,
@@ -32,10 +42,16 @@ def counter_cmd_factory(
context: Optional[str] = None
):
context = context or f"cmd: {counter}"
async def counter_cmd(cog, ctx: commands.Context, *, args: Optional[str] = None):
userid = int(ctx.author.id)
channelid = int((await ctx.channel.user()).id)
period, start_time = await cog.parse_period(channelid, '', default=default_period)
async def counter_cmd(
cog,
ctx: commands.Context | LionContext,
origin: ORIGIN,
author: UserProfile,
community: Community,
args: Optional[str]
):
userid = author.profileid
period, start_time = await cog.parse_period(community, '', default=default_period)
args = (args or '').strip(" 󠀀 ")
splits = args.split(maxsplit=1)
@@ -69,13 +85,25 @@ def counter_cmd_factory(
)
)
async def lb_cmd(cog, ctx: commands.Context, *, args: str = ''):
user = await ctx.channel.user()
await ctx.reply(await cog.formatted_lb(counter, args, int(user.id)))
async def lb_cmd(
cog,
ctx: commands.Context | LionContext,
origin: ORIGIN,
author: UserProfile,
community: Community,
args: Optional[str]
):
await cog.show_lb(ctx, counter, args, author, community, origin)
async def undo_cmd(cog, ctx: commands.Context):
userid = int(ctx.author.id)
channelid = int((await ctx.channel.user()).id)
async def undo_cmd(
cog,
ctx: commands.Context | LionContext,
origin: ORIGIN,
author: UserProfile,
community: Community,
args: Optional[str]
):
userid = author.profileid
_counter = await cog.fetch_counter(counter)
query = cog.data.CounterEntry.fetch_where(
counterid=_counter.counterid,
@@ -107,12 +135,16 @@ class CounterCog(LionCog):
async def cog_load(self):
self._load_twitch_methods(self.crocbot)
await self.load_counter_commands()
await self.data.init()
await self.load_counter_commands()
await self.load_counters()
self.loaded.set()
profiles = self.bot.get_cog('ProfileCog')
profiles.add_profile_migrator(self.migrate_profiles, name='counters')
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
@@ -124,18 +156,48 @@ class CounterCog(LionCog):
counter.name,
row.response
)
cmds = []
main_cmd = commands.command(name=row.name)(counter_cb)
cmds.append(main_cmd)
if row.lbname:
lb_cmd = commands.command(name=row.lbname)(lb_cb)
cmds.append(lb_cmd)
if row.undoname:
undo_cmd = commands.command(name=row.undoname)(undo_cb)
cmds.append(undo_cmd)
twitch_cmds = []
disc_cmds = []
twitch_cmds.append(
commands.command(
name=row.name
)(self.twitch_callback(counter_cb))
)
disc_cmds.append(
cmds.hybrid_command(
name=row.name
)(self.discord_callback(counter_cb))
)
for cmd in cmds:
if row.lbname:
twitch_cmds.append(
commands.command(
name=row.lbname
)(self.twitch_callback(lb_cb))
)
disc_cmds.append(
cmds.hybrid_command(
name=row.lbname
)(self.discord_callback(lb_cb))
)
if row.undoname:
twitch_cmds.append(
commands.command(
name=row.undoname
)(self.twitch_callback(undo_cb))
)
disc_cmds.append(
cmds.hybrid_command(
name=row.undoname
)(self.discord_callback(undo_cb))
)
for cmd in twitch_cmds:
self.add_twitch_command(self.crocbot, cmd)
for cmd in disc_cmds:
# cmd.cog = self
self.bot.add_command(cmd)
print(f"Adding command: {cmd}")
logger.info(f"(Re)Loaded {len(rows)} counter commands!")
@@ -152,6 +214,87 @@ class CounterCog(LionCog):
f"Loaded {len(self.counters)} counters."
)
async def migrate_profiles(self, source_profile: UserProfile, target_profile: UserProfile):
"""
Move source profile entries to target profile entries
"""
results = ["(Counters)"]
rows = await self.data.CounterEntry.table.update_where(userid=source_profile.profileid).set(userid=target_profile.profileid)
if rows:
results.append(
f"Migrated {len(rows)} counter entries from source profile."
)
else:
results.append(
"No counter entries to migrate in source profile."
)
return ' '.join(results)
async def user_profile_migration(self):
"""
Manual single-use migration method from the old userid format to the new profileid format.
"""
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
entries = await self.data.CounterEntry.fetch_where()
for entry in entries:
if entry.userid > 1000:
# Assume userid is a twitch userid
profile = await UserProfile.fetch_from_twitchid(self.bot, entry.userid)
if not profile:
# Need to create
users = await self.crocbot.fetch_users(ids=[entry.userid])
if not users:
continue
user = users[0]
profile = await UserProfile.create_from_twitch(self.bot, user)
await entry.update(userid=profile.profileid)
logger.info("Completed single-shot user profile migration")
# General API
def twitch_callback(self, callback):
"""
Generate a Twitch command callback from the given general callback.
General callback must be of the form
callback(cog, ctx: GeneralContext, origin: ORIGIN, author: Profile, comm: Community, args: Optional[str])
Return will be a command callback of the form
callback(cog, ctx: Context, *, args: Optional[str] = None)
"""
async def command_callback(cog: CounterCog, ctx: commands.Context, *, args: Optional[str] = None):
profiles = cog.bot.get_cog('ProfileCog')
# Compute author profile
author = await profiles.fetch_profile_twitch(ctx.author)
# Compute community profile
community = await profiles.fetch_community_twitch(await ctx.channel.user())
return await callback(cog, ctx, ORIGIN.TWITCH, author, community, args)
return command_callback
def discord_callback(self, callback):
"""
Generate a Discord command callback from the given general callback.
General callback must be of the form
callback(cog, ctx: GeneralContext, origin: ORIGIN, author: Profile, comm: Community, args: Optional[str])
Return will be a command callback of the form
callback(cog, ctx: LionContext, *, args: Optional[str] = None)
"""
cog = self
async def command_callback(ctx: LionContext, *, args: Optional[str] = None):
profiles = cog.bot.get_cog('ProfileCog')
# Compute author profile
author = await profiles.fetch_profile_discord(ctx.author)
# Compute community profile
community = await profiles.fetch_community_discord(ctx.guild)
return await callback(cog, ctx, ORIGIN.DISCORD, author, community, args)
return command_callback
# Counters API
async def fetch_counter(self, counter: str) -> CounterData.Counter:
@@ -218,6 +361,14 @@ class CounterCog(LionCog):
results = await query
return results[0]['counter_total'] if results else 0
# Manage commands
@commands.command()
async def countermigration(self, ctx: commands.Context, *, args: Optional[str]=None):
if not (ctx.author.is_mod or ctx.author.is_broadcaster):
return
await self.user_profile_migration()
await ctx.reply("Counter userid->profileid migration done.")
# Counters commands
@commands.command()
async def counter(self, ctx: commands.Context, name: str, subcmd: Optional[str], *, args: Optional[str]=None):
@@ -225,6 +376,10 @@ class CounterCog(LionCog):
return
name = name.lower()
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_twitch(ctx.author)
userid = author.profileid
community = await profiles.fetch_community_twitch(await ctx.channel.user())
if subcmd is None or subcmd == 'show':
# Show
@@ -241,16 +396,14 @@ class CounterCog(LionCog):
return
await self.add_to_counter(
name,
int(ctx.author.id),
userid,
value,
context='cmd: counter add'
)
total = await self.totals(name)
await ctx.reply(f"'{name}' counter is now: {total}")
elif subcmd == 'lb':
user = await ctx.channel.user()
lbstr = await self.formatted_lb(name, args or '', int(user.id))
await ctx.reply(lbstr)
await self.show_lb(ctx, name, args or '', author, community, origin=ORIGIN.TWITCH)
elif subcmd == 'clear':
await self.reset_counter(name)
await ctx.reply(f"'{name}' counter reset.")
@@ -292,7 +445,7 @@ class CounterCog(LionCog):
else:
await ctx.reply(f"Unrecognised subcommand {subcmd}. Supported subcommands: 'show', 'add', 'lb', 'clear', 'alias'.")
async def parse_period(self, userid: int, periodstr: str, default=PERIOD.STREAM):
async def parse_period(self, community: Community, periodstr: str, default=PERIOD.STREAM):
if periodstr:
period = next((period for period in PERIOD if periodstr.lower() in period.value), None)
if period is None:
@@ -306,9 +459,13 @@ class CounterCog(LionCog):
if period is PERIOD.ALL:
start_time = None
elif period is PERIOD.STREAM:
streams = await self.crocbot.fetch_streams(user_ids=[userid])
if streams:
stream = streams[0]
twitches = await community.twitch_channels()
stream = None
if twitches:
twitch = twitches[0]
streams = await self.crocbot.fetch_streams(user_ids=[int(twitch.channelid)])
stream = streams[0] if streams else None
if stream:
start_time = stream.started_at
else:
period = PERIOD.ALL
@@ -327,21 +484,104 @@ class CounterCog(LionCog):
return (period, start_time)
async def formatted_lb(self, counter: str, periodstr: str, channelid: int):
@cmds.hybrid_command(
name='counterlb',
description="Show the leaderboard for the given counter."
)
async def counterlb_dcmd(self, ctx: LionContext, counter: str, period: Optional[str] = None):
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_discord(ctx.author)
community = await profiles.fetch_community_discord(ctx.guild)
await self.show_lb(ctx, counter, period, author, community, ORIGIN.DISCORD)
period, start_time = await self.parse_period(channelid, periodstr)
@cmds.hybrid_command(
name='counterstats',
description="Show your stats for the given counter."
)
async def counterstats_dcmd(self, ctx: LionContext, counter: str, period: Optional[str]=None):
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_discord(ctx.author)
community = await profiles.fetch_community_discord(ctx.guild)
if period and period.lower() in ('monthly', 'month'):
card = await counter_monthly_card(
self.bot,
userid=ctx.author.id,
profile=author,
counter=await self.fetch_counter(counter),
guildid=ctx.guild.id,
offset=0,
)
await card.render()
await ctx.reply(file=card.as_file('stats.png'))
else:
card = await counter_weekly_card(
self.bot,
userid=ctx.author.id,
profile=author,
counter=await self.fetch_counter(counter),
guildid=ctx.guild.id,
offset=0,
)
await card.render()
await ctx.reply(file=card.as_file('stats.png'))
async def show_lb(
self,
ctx: commands.Context | LionContext,
counter: str,
periodstr: str,
caller: UserProfile,
community: Community,
origin: ORIGIN = ORIGIN.TWITCH
):
period, start_time = await self.parse_period(community, periodstr)
lb = await self.leaderboard(counter, start_time=start_time)
if lb:
userids = list(lb.keys())
users = await self.crocbot.fetch_users(ids=userids)
name_map = {user.id: user.display_name for user in users}
name_map = {}
for userid in lb.keys():
profile = await UserProfile.fetch(self.bot, userid)
name = await profile.get_name()
name_map[userid] = name
if not lb:
await ctx.reply(
f"{counter} {period.value[-1]} leaderboard is empty!"
)
elif origin is ORIGIN.TWITCH:
parts = []
for userid, total in lb.items():
items = list(lb.items())
prefix = 'top 10 ' if len(items) > 10 else ''
items = items[:10]
for userid, total in items:
name = name_map.get(userid, str(userid))
part = f"{name}: {total}"
parts.append(part)
lbstr = '; '.join(parts)
return f"{counter} {period.value[-1]} leaderboard --- {lbstr}"
else:
return f"{counter} {period.value[-1]} leaderboard is empty!"
await ctx.reply(f"{counter} {period.value[-1]} {prefix}leaderboard --- {lbstr}")
elif origin is ORIGIN.DISCORD:
title = f"'{counter}' {period.value[-1]} leaderboard"
lb_strings = []
author_index = None
max_name_len = min((30, max(len(name) for name in name_map.values())))
for i, (uid, total) in enumerate(lb.items()):
if author_index is None and uid == caller.profileid:
author_index = i
lb_strings.append(
"{:<{}}\t{:<9}".format(
name_map[uid],
max_name_len,
total,
)
)
page_len = 20
pages = paginate_list(lb_strings, block_length=page_len, title=title)
start_page = author_index // page_len if author_index is not None else 0
await pager(
ctx,
pages,
start_at=start_page
)

View File

View File

@@ -0,0 +1,222 @@
import itertools
from typing import Optional
from datetime import timedelta, datetime
import calendar
from meta import LionBot
from gui.cards import WeeklyStatsCard, MonthlyStatsCard
from gui.base import CardMode
from modules.profiles.profile import UserProfile
from babel import LocalBabel
from modules.statistics.lib import apply_month_offset
from ..data import CounterData
babel = LocalBabel('counters')
_ = babel._
async def counter_monthly_card(
bot: LionBot,
userid: int,
profile: UserProfile,
counter: CounterData.Counter,
guildid: int,
offset: int,
):
cog = bot.get_cog('CounterCog')
data: CounterData = cog.data
if guildid:
lion = await bot.core.lions.fetch_member(guildid, userid)
user = await lion.fetch_member()
else:
lion = await bot.core.lions.fetch_user(userid)
user = await bot.fetch_user(userid)
today = lion.today
month_start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
target = apply_month_offset(month_start, offset)
target_end = (target + timedelta(days=40)).replace(day=1, hour=0, minute=0) - timedelta(days=1)
months = [target]
for i in range(0, 3):
months.append((months[-1] - timedelta(days=1)).replace(day=1))
months.reverse()
rows = await data.CounterEntry.fetch_where(
data.CounterEntry.counterid == counter.counterid,
data.CounterEntry.userid == profile.profileid,
data.CounterEntry.created_at <= target_end,
data.CounterEntry.created_at >= months[0],
)
events = [(row.created_at, row.value) for row in rows]
month_lengths = [
(calendar.monthrange(month.year, month.month)[1]) for month in months
]
month_dates = []
for month, length in zip(months, month_lengths):
for day in range(1, length + 1):
month_dates.append(datetime(month.year, month.month, day, tzinfo=month.tzinfo))
monthly_flat = events_to_dayfreq(events, month_dates)
print(monthly_flat)
monthly = []
i = 0
for length in month_lengths:
this_month = monthly_flat[i : i+length]
i += length
monthly.append(this_month)
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
guildid, userid, MonthlyStatsCard.card_id
)
skin |= {
'title_text': f"{counter.name.upper()}",
'this_month_text': f"THIS MONTH: {{amount}} {counter.name.upper()}",
'last_month_text': f"LAST MONTH: {{amount}} {counter.name.upper()}"
}
if user:
username = (user.display_name, '')
else:
username = (await profile.get_name(), '')
card = MonthlyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
month=int(target.timestamp()),
monthly=monthly,
current_streak=-1,
longest_streak=-1,
skin=skin | {'mode': CardMode.TEXT}
)
return card
async def counter_weekly_card(
bot: LionBot,
userid: int,
profile: UserProfile,
counter: CounterData.Counter,
guildid: int,
offset: int,
):
cog = bot.get_cog('CounterCog')
data: CounterData = cog.data
if guildid:
lion = await bot.core.lions.fetch_member(guildid, userid)
user = await lion.fetch_member()
else:
lion = await bot.core.lions.fetch_user(userid)
user = await bot.fetch_user(userid)
today = lion.today
week_start = today - timedelta(days=today.weekday()) - timedelta(weeks=offset)
days = [week_start + timedelta(i) for i in range(-7, 8 if offset else (today.weekday() + 2))]
rows = await data.CounterEntry.fetch_where(
data.CounterEntry.counterid == counter.counterid,
data.CounterEntry.userid == profile.profileid,
data.CounterEntry.created_at <= days[-1],
data.CounterEntry.created_at >= days[0],
)
events = [(row.created_at, row.value) for row in rows]
daily = events_to_dayfreq(events, days)
sessions = events_to_sessions(next(zip(*events), []))
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
guildid, userid, WeeklyStatsCard.card_id
)
skin |= {
'title_text': f"{counter.name.upper()}",
'this_week_text': f"THIS WEEK: {{amount}} {counter.name.upper()}",
'last_week_text': f"LAST WEEK: {{amount}} {counter.name.upper()}"
}
if user:
username = (user.display_name, '')
else:
username = (await profile.get_name(), '')
card = WeeklyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
week=week_start.timestamp(),
daily=tuple(map(int, daily)),
sessions=sessions,
skin=skin | {'mode': CardMode.TEXT}
)
return card
def events_to_dayfreq(events: list[tuple[datetime, int]], days: list[datetime]) -> list[int]:
if not days:
return []
last_day = 0
dayts = 0
daymap = {}
for day in sorted(days, reverse=True):
dayts = day.timestamp()
last_day = last_day or (day + timedelta(days=1)).timestamp()
daymap[dayts] = 0
first_day = dayts
for tim, count in events:
timts = tim.timestamp()
if not first_day < timts < last_day:
continue
for day_start in daymap:
if timts > day_start:
daymap[day_start] += count
break
return list(reversed(daymap.values()))
def events_to_sessions(event_times: list[datetime]) -> list[tuple[int, int]]:
"""
Convert a provided list of event times to a session list.
"""
sessions = []
session_start = None
session_end = None
SESSION_GAP = 60 * 30
SESSION_RADIUS = 60 * 30
for time in sorted(event_times):
if session_start and session_end and (time - session_end).total_seconds() - SESSION_RADIUS > SESSION_GAP:
session = (int(session_start.timestamp()), int(session_end.timestamp()))
sessions.append(session)
session_start = None
session_end = None
if session_start is None:
session_start = time - timedelta(seconds=SESSION_RADIUS)
session_end = time + timedelta(seconds=SESSION_RADIUS)
if session_start and session_end:
session = (int(session_start.timestamp()), int(session_end.timestamp()))
sessions.append(session)
return sessions

View File

View File

View File

@@ -0,0 +1,8 @@
import logging
logger = logging.getLogger(__name__)
from .cog import HyperFocusCog
async def setup(bot):
await bot.add_cog(HyperFocusCog(bot))

View File

@@ -0,0 +1,306 @@
import asyncio
import json
from typing import Optional
from dataclasses import dataclass
import twitchio
from twitchio.ext import commands
from twitchAPI.type import AuthScope
import random
import datetime as dt
from datetime import timedelta, datetime
from meta import CrocBot, LionCog, LionContext, LionBot
from meta.sockets import Channel, register_channel
from utils.lib import strfdelta, utc_now
from . import logger
@dataclass
class FocusState:
userid: int | str
name: str
focus_ends: datetime
hyper: bool = True
class FocusChannel(Channel):
name = 'FocusList'
def __init__(self, cog: 'HyperFocusCog', **kwargs):
self.cog = cog
super().__init__(**kwargs)
async def on_connection(self, websocket, event):
await super().on_connection(websocket, event)
await self.reload_focus(websocket=websocket)
def focus_args(self, state: FocusState):
return (
state.userid,
state.name,
state.hyper,
state.focus_ends.isoformat(),
)
async def reload_focus(self, websocket=None):
"""
Clear tasklist and re-send current tasks.
"""
await self.send_clear(websocket=websocket)
for state in self.cog.hyperfocusing.values():
await self.send_set(*self.focus_args(state), websocket=websocket)
async def send_set(self, userid, name, hyper, end_at, websocket=None):
await self.send_event({
'type': "DO",
'method': "setFocus",
'args': {
'userid': userid,
'name': name,
'hyper': hyper,
'end_at': end_at,
}
}, websocket=websocket)
async def send_del(self, userid, websocket=None):
await self.send_event({
'type': "DO",
'method': "delFocus",
'args': {
'userid': userid,
}
}, websocket=websocket)
async def send_clear(self, websocket=None):
await self.send_event({
'type': "DO",
'method': "clearFocus",
'args': {
}
}, websocket=websocket)
class HyperFocusCog(LionCog):
def __init__(self, bot: CrocBot):
self.bot = bot
self.crocbot: CrocBot = bot.crocbot
# userid -> timestamp when they stop
self.hyperfocusing: dict[str, FocusState] = {}
self.channel = FocusChannel(self)
register_channel(self.channel.name, self.channel)
self.loaded = asyncio.Event()
async def cog_load(self):
self._load_twitch_methods(self.crocbot)
self.load_hyperfocus()
self.loaded.set()
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
def save_hyperfocus(self):
with open('hyperfocus.json', 'w', encoding='utf-8') as f:
mapped = {
userid: {
'userid': str(state.userid),
'name': state.name,
'focus_ends': state.focus_ends.isoformat(),
'hyper': state.hyper
}
for userid, state in self.hyperfocusing.items()
}
json.dump(mapped, f, ensure_ascii=False, indent=4)
def load_hyperfocus(self):
with open('hyperfocus.json') as f:
mapped = json.load(f)
self.hyperfocusing.clear()
for userid, map in mapped.items():
self.hyperfocusing[str(userid)] = FocusState(
userid=str(map['userid']),
name=map['name'],
hyper=map['hyper'],
focus_ends=dt.datetime.fromisoformat(map['focus_ends'])
)
print(f"Loaded hyperfocus: {self.hyperfocusing}")
def check_hyperfocus(self, userid):
"""
Returns whether a user is currently in HYPERFOCUS mode!
"""
return (state := self.hyperfocusing.get(userid, None)) and utc_now() < state.focus_ends
@commands.Cog.event('event_message')
async def on_message(self, message: twitchio.Message):
if message.content and message.content.lower() == 'nice':
await message.channel.send("That's Nice")
await self.good_croccy_handler(message)
tags = message.tags
if tags and message.content and self.check_hyperfocus(tags.get('user-id')):
if not self.valid_focus_message(message):
logger.info(
f"Deleting message from hyperfocused user. {message.raw_data=}"
)
await asyncio.sleep(1)
msgid = tags['id']
# TODO: Better selection for moderator
# i.e. if the message is not from the broadcaster and we do have delete perms
# then use our own token.
broadcasterid = tags['room-id']
authcog = self.bot.get_cog('TwitchAuthCog')
if not await authcog.check_auth(broadcasterid, scopes=[AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES]):
await message.channel.send(f"@{message.author.name} Stay focused! (I tried to delete your message because you are in !hyperfocus. Unfortunately I don't have the permissions to do that. But stay focused anyway!)")
else:
twitch = await authcog.fetch_client_for(broadcasterid)
await twitch.delete_chat_message(
broadcasterid,
broadcasterid,
msgid,
)
await message.channel.send(
f"@{message.author.name} Stay focused! (I deleted your message because you are in !hyperfocus, use !unfocus to come back.)"
)
async def good_croccy_handler(self, message: twitchio.Message):
if not message.content:
return
cleaned = message.content.lower().replace('@croccyhelper', '').strip()
if cleaned in ('good croc', 'good croccy', 'good helper'):
await message.channel.send("holono1Heart")
elif cleaned in ('bad croc', 'bad croccy', 'bad helper'):
await message.channel.send("holono1Sad")
async def chemical_handler(self, message: twitchio.Message):
if not message.content:
return
cleaned = message.content.lower().strip()
if cleaned in ('oh',):
await message.channel.send('Oxygen Hydrogen!')
def valid_focus_message(self, message: twitchio.Message) -> bool:
"""
Determined whether the given message is allowed to be sent in !hyperfocus.
That is, if it appears to be emote-only or a command.
"""
content = message.content
if not content:
return True
tags = message.tags or {}
to_remove = []
if (replying := tags.get('reply-parent-user-login', '')) and content.startswith('@'):
# Trim the mention from the start of the content
splits = content.split(maxsplit=1)
to_remove.append((0, len(splits[0])))
if emotesstr := tags.get('emotes', ''):
for emotestr in emotesstr.split('/'):
emote, locs = emotestr.split(':')
for loc in locs.split(','):
start, end = loc.split('-')
to_remove.append((int(start), int(end) + 1))
# Sort the pairs to remove by descending starting index
# This should allow clean removal with a loop as long as there are no intersections.
to_remove.sort(key=lambda pair: pair[0], reverse=True)
for start, end in to_remove:
content = content[:start] + content[end:]
content = content.strip().replace(' ', '').replace('\n', '')
allowed = not content or content.startswith('!') or content.startswith('*')
allowed = allowed or all(not char.isascii() for char in content)
if not allowed:
logger.info(f"Invalid hyperfocus message. Trimmed content: {content}")
return allowed
@commands.command(name='coinflip')
async def coinflip(self, ctx):
await ctx.reply(random.choice(('heads', 'tails')))
@commands.command(name='choose')
async def choose(self, ctx, *, args: str):
if not args:
await ctx.reply("Give me something to choose, e.g. !choose Heads | Tails")
else:
options = args.split('|')
options = [option.strip() for option in options]
options = [option for option in options if option]
choice = random.choice(options)
if random.random() < 0.01:
choice = "You"
await ctx.reply(f"I choose: {choice}")
@commands.command(name='hyperfocus')
async def hyperfocus_cmd(self, ctx, dur: Optional[int] = None):
userid = str(ctx.author.id)
now = utc_now()
end_time = None
if dur is None:
# Automatically select time
next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta(hours=1)
next_block = next_hour - dt.timedelta(minutes=10)
if now > next_block:
# Currently in the break
next_block = next_block + dt.timedelta(hours=1)
end_time = next_block
dur = int((end_time - now).total_seconds() // 60)
elif dur > 720:
await ctx.reply("You can hyperfocus for at most 12 hours at a time!")
else:
end_time = utc_now() + dt.timedelta(minutes=dur)
if end_time is not None:
state = self.hyperfocusing[userid] = FocusState(
userid=userid,
name=ctx.author.display_name,
focus_ends=end_time,
)
self.save_hyperfocus()
await self.channel.send_set(*self.channel.focus_args(state))
await ctx.reply(
f"{ctx.author.name} has gone into HYPERFOCUS mode! "
f"They will be in emote and command only mode for the next {dur} minutes! "
"Use !unfocus if you really need to chat before then, best of luck! 🍀"
)
@commands.command(name='unfocus')
async def unfocus_cmd(self, ctx):
self.hyperfocusing.pop(ctx.author.id, None)
self.save_hyperfocus()
await self.channel.send_del(ctx.author.id)
await ctx.reply("Welcome back from focus, hope it went well! Have a comfy break and remember to have a sippie and a stretch~")
@commands.command(name='hyperfocused')
async def focused_cmd(self, ctx, user: Optional[twitchio.User] = None):
user = user if user is not None else ctx.author
userid = str(user.id)
if self.check_hyperfocus(userid):
state = self.hyperfocusing.get(userid)
end_time = state.focus_ends
durstr = strfdelta(end_time - utc_now())
await ctx.reply(
f"{user.name} is in HYPERFOCUS for another {durstr}! "
"They can only write emojis and commands in this time~ "
"(use !unfocus to come back if you need to!) "
"Good luck!"
)
elif userid != str(ctx.author.id):
await ctx.reply(
f"{user.name} is not hyperfocused!"
)
else:
await ctx.reply(
"You are not hyperfocused! "
"Enter HYPERFOCUS mode for e.g. 10 minutes by writing !hyperfocus 10"
)

View File

@@ -4,17 +4,21 @@ import json
import os
from typing import Optional
from attr import dataclass
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
import twitchio
from twitchio.ext import commands
from meta import CrocBot, LionCog
from meta.LionBot import LionBot
from meta import CrocBot, LionCog, LionContext, LionBot
from meta.sockets import Channel, register_channel
from utils.lib import strfdelta, utc_now
from . import logger
from .data import NowListData
from modules.profiles.profile import UserProfile
class NowDoingChannel(Channel):
name = 'NowList'
@@ -25,19 +29,7 @@ class NowDoingChannel(Channel):
async def on_connection(self, websocket, event):
await super().on_connection(websocket, event)
for task in self.cog.tasks.values():
await self.send_set(*self.task_args(task), websocket=websocket)
async def send_test_set(self):
tasks = [
(0, 'Tester0', "Testing Tasklist", True),
(1, 'Tester1', "Getting Confused", False),
(2, "Tester2", "Generating Bugs", True),
(3, "Tester3", "Fixing Bugs", False),
(4, "Tester4", "Pushing the red button", False),
]
for task in tasks:
await self.send_set(*task)
await self.reload_tasklist(websocket=websocket)
def task_args(self, task: NowListData.Task):
return (
@@ -48,6 +40,14 @@ class NowDoingChannel(Channel):
task.done_at.isoformat() if task.done_at else None,
)
async def reload_tasklist(self, websocket=None):
"""
Clear tasklist and re-send current tasks.
"""
await self.send_clear(websocket=websocket)
for task in self.cog.tasks.values():
await self.send_set(*self.task_args(task), websocket=websocket)
async def send_set(self, userid, name, task, start_at, end_at, websocket=None):
await self.send_event({
'type': "DO",
@@ -61,28 +61,28 @@ class NowDoingChannel(Channel):
}
}, websocket=websocket)
async def send_del(self, userid):
async def send_del(self, userid, websocket=None):
await self.send_event({
'type': "DO",
'method': "delTask",
'args': {
'userid': userid,
}
})
}, websocket=websocket)
async def send_clear(self):
async def send_clear(self, websocket=None):
await self.send_event({
'type': "DO",
'method': "clearTasks",
'args': {
}
})
}, websocket=websocket)
class NowDoingCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.crocbot = bot.crocbot
self.crocbot: CrocBot = bot.crocbot
self.data = bot.db.load_registry(NowListData())
self.channel = NowDoingChannel(self)
register_channel(self.channel.name, self.channel)
@@ -94,17 +94,82 @@ class NowDoingCog(LionCog):
async def cog_load(self):
await self.data.init()
await self.load_tasks()
self.bot.get_cog('ProfileCog').add_profile_migrator(self.migrate_profiles, name='task-migrator')
self._load_twitch_methods(self.crocbot)
self.loaded.set()
async def cog_unload(self):
self.loaded.clear()
self.tasks.clear()
if profiles := self.bot.get_cog('ProfileCog'):
profiles.del_profile_migrator('task-migrator')
self._unload_twitch_methods(self.crocbot)
async def migrate_profiles(self, source_profile: UserProfile, target_profile: UserProfile):
"""
Move current source task to target profile if there's room for it, otherwise annihilate
"""
await self.load_tasks()
source_task = self.tasks.pop(source_profile.profileid, None)
results = ["(Tasklist)"]
if source_task:
target_task = self.tasks.get(target_profile.profileid, None)
if target_task and (target_task.is_done or target_task.started_at < source_task.started_at):
# If target is done, remove it so we can overwrite
results.append("Removed older task from target profile.")
await target_task.delete()
target_task = None
if not target_task:
# Update source task with new profile id
await source_task.update(userid=target_profile.profileid)
target_task = source_task
await self.channel.send_set(*self.channel.task_args(target_task))
results.append("Migrated 1 currently running task from source profile.")
else:
# If there is a target task we can't overwrite, just delete the source task
await source_task.delete()
results.append("Ignoring and removing older task from source profile.")
self.tasks.pop(source_profile.profileid, None)
await self.channel.send_del(source_profile.profileid)
else:
results.append("No running task in source profile, nothing to migrate!")
await self.load_tasks()
return ' '.join(results)
async def user_profile_migration(self):
"""
Manual single-use migration method from the old userid format to the new profileid format.
"""
await self.load_tasks()
for userid, task in self.tasks.items():
userid = int(userid)
if userid > 1000:
# Assume it is a twitch userid
profile = await UserProfile.fetch_from_twitchid(self.bot, userid)
if not profile:
# Create a new profile with this twitch user
users = await self.crocbot.fetch_users(ids=[userid])
if not users:
continue
user = users[0]
profile = await UserProfile.create_from_twitch(self.bot, user)
if not await self.data.Task.fetch(profile.profileid):
await task.update(userid=profile.profileid)
else:
await task.delete()
await self.load_tasks()
await self.channel.reload_tasklist()
async def cog_check(self, ctx):
if not self.loaded.is_set():
await ctx.reply("Tasklists are still loading! Please wait a moment~")
@@ -123,25 +188,27 @@ class NowDoingCog(LionCog):
# await self.channel.send_test_set()
# await ctx.send(f"Hello {ctx.author.name}! This command does something, we aren't sure what yet.")
# await ctx.send(str(list(self.tasks.items())[0]))
await self.user_profile_migration()
await ctx.send(str(ctx.author.id))
await ctx.reply("Userid -> profile migration done.")
else:
await ctx.send(f"Hello {ctx.author.name}! I don't think you have permission to test that.")
@commands.command(aliases=['task', 'check'])
async def now(self, ctx: commands.Context, *, args: Optional[str] = None):
userid = int(ctx.author.id)
async def now(self, ctx: commands.Context | LionContext, profile: UserProfile, args: Optional[str] = None, edit=False):
args = args.strip() if args else None
userid = profile.profileid
if args:
existing = self.tasks.get(userid, None)
await self.data.Task.table.delete_where(userid=userid)
task = await self.data.Task.create(
userid=userid,
name=ctx.author.display_name,
name=await profile.get_name(),
task=args,
started_at=utc_now(),
started_at=existing.started_at if (existing and edit) else utc_now(),
)
self.tasks[task.userid] = task
await self.channel.send_set(*self.channel.task_args(task))
await ctx.send(f"Updated your current task, good luck!")
await ctx.send("Updated your current task, good luck!")
elif task := self.tasks.get(userid, None):
if task.is_done:
done_ago = strfdelta(utc_now() - task.done_at)
@@ -159,9 +226,38 @@ class NowDoingCog(LionCog):
"Show what you are currently working on with, e.g. !now Reading notes"
)
@commands.command(name='next')
async def nownext(self, ctx: commands.Context, *, args: Optional[str] = None):
userid = int(ctx.author.id)
@commands.command(
name='now',
aliases=['task', 'check']
)
async def twi_now(self, ctx: commands.Context, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
await self.now(ctx, profile, args)
@cmds.hybrid_command(
name='now',
aliases=['task', 'check']
)
async def disc_now(self, ctx: LionContext, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_discord(ctx.author)
await self.now(ctx, profile, args)
@commands.command(
name='edit',
)
async def twi_edit(self, ctx: commands.Context, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
await self.now(ctx, profile, args, edit=True)
@cmds.hybrid_command(
name='edit',
)
async def disc_edit(self, ctx: LionContext, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_discord(ctx.author)
await self.now(ctx, profile, args, edit=True)
async def nownext(self, ctx: commands.Context | LionContext, profile: UserProfile, args: Optional[str]):
userid = profile.profileid
task = self.tasks.get(userid, None)
if args:
if task:
@@ -176,13 +272,13 @@ class NowDoingCog(LionCog):
await self.data.Task.table.delete_where(userid=userid)
task = await self.data.Task.create(
userid=userid,
name=ctx.author.display_name,
name=await profile.get_name(),
task=args,
started_at=utc_now(),
)
self.tasks[task.userid] = task
await self.channel.send_set(*self.channel.task_args(task))
await ctx.send(f"Next task set, good luck!" + ' ' + prefix)
await ctx.send("Next task set, good luck!" + ' ' + prefix)
elif task:
if task.is_done:
done_ago = strfdelta(utc_now() - task.done_at)
@@ -200,9 +296,22 @@ class NowDoingCog(LionCog):
"Show what you are currently working on with, e.g. !now Reading notes"
)
@commands.command()
async def done(self, ctx: commands.Context):
userid = int(ctx.author.id)
@commands.command(
name='next',
)
async def twi_next(self, ctx: commands.Context, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
await self.nownext(ctx, profile, args)
@cmds.hybrid_command(
name='next',
)
async def disc_next(self, ctx: LionContext, *, args: Optional[str] = None):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_discord(ctx.author)
await self.nownext(ctx, profile, args)
async def done(self, ctx: commands.Context | LionContext, profile: UserProfile):
userid = profile.profileid
if task := self.tasks.get(userid, None):
if task.is_done:
await ctx.send(
@@ -222,9 +331,36 @@ class NowDoingCog(LionCog):
"Show what you are currently working on with, e.g. !now Reading notes"
)
@commands.command()
async def clear(self, ctx: commands.Context):
userid = int(ctx.author.id)
@commands.command(
name='done',
)
async def twi_done(self, ctx: commands.Context):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
await self.done(ctx, profile)
@cmds.hybrid_command(
name='done',
)
async def disc_done(self, ctx: LionContext):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_discord(ctx.author)
await self.done(ctx, profile)
@commands.command(
name='clear',
)
async def twi_clear(self, ctx: commands.Context):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
await self.clear(ctx, profile)
@cmds.hybrid_command(
name='clear',
)
async def disc_clear(self, ctx: LionContext):
profile = await self.bot.get_cog('ProfileCog').fetch_profile_discord(ctx.author)
await self.clear(ctx, profile)
async def clear(self, ctx: commands.Context | LionContext, profile):
userid = profile.profileid
if task := self.tasks.pop(userid, None):
await task.delete()
await self.channel.send_del(userid)

View File

@@ -8,10 +8,12 @@ from gui.cards import FocusTimerCard, BreakTimerCard
if TYPE_CHECKING:
from .timer import Timer, Stage
from tracking.voice.cog import VoiceTrackerCog
from modules.nowdoing.cog import NowDoingCog
async def get_timer_card(bot: LionBot, timer: 'Timer', stage: 'Stage'):
voicecog: 'VoiceTrackerCog' = bot.get_cog('VoiceTrackerCog')
nowcog: 'NowDoingCog' = bot.get_cog('NowDoingCog')
name = timer.base_name
if stage is not None:
@@ -23,16 +25,22 @@ async def get_timer_card(bot: LionBot, timer: 'Timer', stage: 'Stage'):
card_users = []
guildid = timer.data.guildid
for member in timer.members:
if voicecog is not None:
session = voicecog.get_session(guildid, member.id)
tag = session.tag
if session.start_time:
session_duration = (utc_now() - session.start_time).total_seconds()
else:
session_duration = 0
profile = await bot.get_cog('ProfileCog').fetch_profile_discord(member)
task = nowcog.tasks.get(profile.profileid, None)
tag = ''
session_duration = 0
if task:
tag = task.task
session_duration = ((task.done_at or utc_now()) - task.started_at).total_seconds()
else:
session_duration = 0
tag = None
session = voicecog.get_session(guildid, member.id)
if session:
tag = session.tag
if session.start_time:
session_duration = (utc_now() - session.start_time).total_seconds()
else:
session_duration = 0
card_user = (
(member.id, (member.avatar or member.default_avatar).key),

View File

@@ -92,7 +92,7 @@ class ProfileCog(LionCog):
results.append(f"Migrated {len(twitch_rows)} attached twitch account(s).")
# And then mark the old profile as migrated
await source_profile.update(migrated=target_profile.profileid)
await source_profile.profile_row.update(migrated=target_profile.profileid)
results.append("Marking old profile as migrated.. finished!")
return results

View File

@@ -31,6 +31,30 @@ class UserProfile:
def __repr__(self):
return f"<UserProfile profileid={self.profileid} profile={self.profile_row}>"
async def get_name(self):
# TODO: Store a preferred name in the profile preferences
# TODO Should have a multi-fetch system
name = None
twitches = await self.twitch_accounts()
if twitches:
users = await self.bot.crocbot.fetch_users(
ids=[int(twitches[0].userid)]
)
if users:
user = users[0]
name = user.display_name
if not name:
discords = await self.discord_accounts()
if discords:
user = await self.bot.fetch_user(discords[0].userid)
name = user.display_name
if not name:
name = 'Unknown'
return name
async def attach_discord(self, user: discord.User | discord.Member):
"""
Attach a new discord user to this profile.
@@ -62,13 +86,21 @@ class UserProfile:
"""
Fetch the Discord accounts associated to this profile.
"""
return await self.data.DiscordProfileRow.fetch_where(profileid=self.profileid)
return await self.data.DiscordProfileRow.fetch_where(
profileid=self.profileid
).order_by(
'created_at'
)
async def twitch_accounts(self) -> list[ProfileData.TwitchProfileRow]:
"""
Fetch the Twitch accounts associated to this profile.
"""
return await self.data.TwitchProfileRow.fetch_where(profileid=self.profileid)
return await self.data.TwitchProfileRow.fetch_where(
profileid=self.profileid
).order_by(
'created_at'
)
@classmethod
async def fetch(cls, bot: LionBot, profile_id: int) -> Self:

View File

@@ -8,9 +8,16 @@ async def get_leaderboard_card(
bot: LionBot, highlightid: int, guildid: int,
mode: CardMode,
entry_data: list[tuple[int, int, int]], # userid, position, time
name_map: dict[int, str] = {},
extra_skin_args = {},
):
"""
Render a leaderboard card with given parameters.
Parameters
----------
name_map: dict[int, str]
Map of userid -> name, used first before cache or fetch.
"""
guild = bot.get_guild(guildid)
if guild is None:
@@ -20,8 +27,12 @@ async def get_leaderboard_card(
avatars = {}
names = {}
missing = []
for userid, _, _ in entry_data:
if guild and (member := guild.get_member(userid)):
if (name := name_map.get(userid, None)):
avatars[userid] = None
names[userid] = name
elif guild and (member := guild.get_member(userid)):
avatars[userid] = member.avatar.key if member.avatar else None
names[userid] = member.display_name
elif (user := bot.get_user(userid)):
@@ -65,7 +76,7 @@ async def get_leaderboard_card(
guildid, None, LeaderboardCard.card_id
)
card = LeaderboardCard(
skin=skin | {'mode': mode},
skin=skin | {'mode': mode} | extra_skin_args,
server_name=guild.name,
entries=entries,
highlight=highlight

View File

@@ -654,7 +654,7 @@ class VoiceTrackerCog(LionCog):
# ----- Commands -----
@cmds.hybrid_command(
name=_p('cmd:now', "now"),
name="tag",
description=_p(
'cmd:now|desc',
"Describe what you are working on, or see what your friends are working on!"

View File

@@ -29,13 +29,26 @@ class TwitchAuthCog(LionCog):
self.bot = bot
self.data = bot.db.load_registry(TwitchAuthData())
self.client_cache = {}
async def cog_load(self):
await self.data.init()
# ----- Auth API -----
async def fetch_client_for(self, userid: int):
...
async def fetch_client_for(self, userid: str):
authrow = await self.data.UserAuthRow.fetch(userid)
if authrow is None:
# TODO: Some user authentication error
self.client_cache.pop(userid, None)
raise ValueError("Requested user is not authenticated.")
if (twitch := self.client_cache.get(userid)) is None:
twitch = await Twitch(self.bot.config.twitch['app_id'], self.bot.config.twitch['app_secret'])
scopes = await self.data.UserAuthRow.get_scopes_for(userid)
authscopes = [AuthScope(scope) for scope in scopes]
await twitch.set_user_authentication(authrow.access_token, authscopes, authrow.refresh_token)
self.client_cache[userid] = twitch
return twitch
async def check_auth(self, userid: str, scopes: list[AuthScope] = []) -> bool:
"""
@@ -46,7 +59,9 @@ class TwitchAuthCog(LionCog):
if authrow:
if scopes:
has_scopes = await self.data.UserAuthRow.get_scopes_for(userid)
has_auth = set(map(str, scopes)).issubset(has_scopes)
desired = {scope.value for scope in scopes}
has_auth = desired.issubset(has_scopes)
logger.info(f"Auth check for `{userid}`: Requested scopes {desired}, has scopes {has_scopes}. Passed: {has_auth}")
else:
has_auth = True
else:
@@ -58,6 +73,7 @@ class TwitchAuthCog(LionCog):
Start the user authentication flow for the given userid.
Will request the given scopes along with the default ones and any existing scopes.
"""
self.client_cache.pop(userid, None)
existing_strs = await self.data.UserAuthRow.get_scopes_for(userid)
existing = map(AuthScope, existing_strs)
to_request = set(existing).union(scopes)
@@ -82,3 +98,17 @@ class TwitchAuthCog(LionCog):
await ctx.reply(flow.auth.return_auth_url())
await flow.run()
await ctx.reply("Authentication Complete!")
@cmds.hybrid_command(name='modauth')
async def cmd_modauth(self, ctx: LionContext):
if ctx.interaction:
await ctx.interaction.response.defer(ephemeral=True)
scopes = [
AuthScope.MODERATOR_READ_FOLLOWERS,
AuthScope.CHANNEL_READ_REDEMPTIONS,
AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES,
]
flow = await self.start_auth(scopes=scopes)
await ctx.reply(flow.auth.return_auth_url())
await flow.run()
await ctx.reply("Authentication Complete!")

View File

@@ -64,7 +64,7 @@ class TwitchAuthData(Registry):
"""
rows = await TwitchAuthData.user_scopes.select_where(userid=userid)
return [row.scope for row in rows] if rows else []
return [row['scope'] for row in rows] if rows else []
"""

View File

@@ -56,7 +56,7 @@ class UserAuthFlow:
result = await self._comm_task
if result.get('error', None):
# TODO Custom auth errors
# This is only documented to occure when the user denies the auth
# This is only documented to occur when the user denies the auth
raise SafeCancellation(f"Could not authenticate user! Reason: {result['error_description']}")
if result.get('state', None) != self.auth.state:

View File

@@ -7,6 +7,7 @@ import iso8601 # type: ignore
import pytz
import re
import json
import asyncio
from contextvars import Context
import discord
@@ -918,3 +919,126 @@ def write_records(records: list[dict[str, Any]], stream: StringIO):
for record in records:
stream.write(','.join(map(str, record.values())))
stream.write('\n')
async def pager(ctx, pages, locked=True, start_at=0, add_cancel=False, **kwargs):
"""
Shows the user each page from the provided list `pages` one at a time,
providing reactions to page back and forth between pages.
This is done asynchronously, and returns after displaying the first page.
Parameters
----------
pages: List(Union(str, discord.Embed))
A list of either strings or embeds to display as the pages.
locked: bool
Whether only the `ctx.author` should be able to use the paging reactions.
kwargs: ...
Remaining keyword arguments are transparently passed to the reply context method.
Returns: discord.Message
This is the output message, returned for easy deletion.
"""
cancel_emoji = cross
# Handle broken input
if len(pages) == 0:
raise ValueError("Pager cannot page with no pages!")
# Post first page. Method depends on whether the page is an embed or not.
if isinstance(pages[start_at], discord.Embed):
out_msg = await ctx.reply(embed=pages[start_at], **kwargs)
else:
out_msg = await ctx.reply(pages[start_at], **kwargs)
# Run the paging loop if required
if len(pages) > 1:
task = asyncio.create_task(_pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs))
# ctx.tasks.append(task)
elif add_cancel:
await out_msg.add_reaction(cancel_emoji)
# Return the output message
return out_msg
async def _pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs):
"""
Asynchronous initialiser and loop for the `pager` utility above.
"""
# Page number
page = start_at
# Add reactions to the output message
next_emoji = ""
prev_emoji = ""
cancel_emoji = cross
try:
await out_msg.add_reaction(prev_emoji)
if add_cancel:
await out_msg.add_reaction(cancel_emoji)
await out_msg.add_reaction(next_emoji)
except discord.Forbidden:
# We don't have permission to add paging emojis
# Die as gracefully as we can
if ctx.guild:
perms = ctx.channel.permissions_for(ctx.guild.me)
if not perms.add_reactions:
await ctx.error_reply(
"Cannot page results because I do not have the `add_reactions` permission!"
)
elif not perms.read_message_history:
await ctx.error_reply(
"Cannot page results because I do not have the `read_message_history` permission!"
)
else:
await ctx.error_reply(
"Cannot page results due to insufficient permissions!"
)
else:
await ctx.error_reply(
"Cannot page results!"
)
return
# Check function to determine whether a reaction is valid
def check(reaction, user):
result = reaction.message.id == out_msg.id
result = result and str(reaction.emoji) in [next_emoji, prev_emoji]
result = result and not (user.id == ctx.bot.user.id)
result = result and not (locked and user != ctx.author)
return result
# Begin loop
while True:
# Wait for a valid reaction, break if we time out
try:
reaction, user = await ctx.bot.wait_for('reaction_add', check=check, timeout=300)
except asyncio.TimeoutError:
break
# Attempt to remove the user's reaction, silently ignore errors
asyncio.ensure_future(out_msg.remove_reaction(reaction.emoji, user))
# Change the page number
page += 1 if reaction.emoji == next_emoji else -1
page %= len(pages)
# Edit the message with the new page
active_page = pages[page]
if isinstance(active_page, discord.Embed):
await out_msg.edit(embed=active_page, **kwargs)
else:
await out_msg.edit(content=active_page, **kwargs)
# Clean up by removing the reactions
try:
await out_msg.clear_reactions()
except discord.Forbidden:
try:
await out_msg.remove_reaction(next_emoji, ctx.client.user)
await out_msg.remove_reaction(prev_emoji, ctx.client.user)
except discord.NotFound:
pass
except discord.NotFound:
pass