1 Commits

Author SHA1 Message Date
7a2ad77b91 fix(counters): Fix alias creation not remove cmds. 2024-11-22 23:08:13 +10:00
20 changed files with 34 additions and 1525 deletions

Submodule src/gui updated: 62d2484914...40bc140355

View File

@@ -32,10 +32,6 @@ active_discord = [
'.shoutouts',
'.tagstrings',
'.voiceroles',
'.hyperfocus',
'.twreminders',
'.time',
'.checkin',
]
async def setup(bot):

View File

@@ -1,8 +0,0 @@
import logging
logger = logging.getLogger(__name__)
from .cog import CheckinCog
async def setup(bot):
await bot.add_cog(CheckinCog(bot))

View File

@@ -1,154 +0,0 @@
import asyncio
from typing import Optional
import datetime as dt
from datetime import timedelta, datetime
import discord
import twitchAPI
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionData
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.type import AuthScope
import twitchio
from twitchio.ext import commands
from meta import CrocBot, LionCog, LionContext, LionBot
from utils.lib import utc_now
from . import logger
class CheckinCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.crocbot: CrocBot = bot.crocbot
self.listeners = []
self.eswebsockets = {}
async def cog_load(self):
self._loop = asyncio.get_running_loop()
self._load_twitch_methods(self.crocbot)
check_in_channel_id = self.bot.config.croccy['check_in_channel'].strip()
await self.attach_checkin_channel(check_in_channel_id)
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
async def fetch_eventsub_for(self, channelid):
if (eventsub := self.eswebsockets.get(channelid)) is None:
authcog = self.bot.get_cog('TwitchAuthCog')
if not await authcog.check_auth(channelid, scopes=[AuthScope.CHANNEL_READ_REDEMPTIONS]):
logger.error(
f"Insufficient auth to login to registered check-in channelid {channelid}"
)
else:
twitch = await authcog.fetch_client_for(channelid)
eventsub = EventSubWebsocket(twitch)
eventsub.start()
self.eswebsockets[channelid] = eventsub
return eventsub
async def attach_checkin_channel(self, channel):
# Register a listener for the given channel (given as a string id)
eventsub = await self.fetch_eventsub_for(channel)
if eventsub:
await eventsub.listen_channel_points_custom_reward_redemption_add(channel, self.handle_redeem)
logger.info(f"Attached check-in listener to registered channel {channel}")
else:
logger.error(f"Could not attach checkin listener to registered channel {channel}")
async def handle_redeem(self, data: ChannelPointsCustomRewardRedemptionData):
# Check if the redeem is one of the 'checkin' or 'quiet checkin' redeems.
title = data.event.reward.title.lower()
# TODO: Redeem ID based registration (configured)
seeking = ('check in', 'quiet hello')
if title in seeking:
quiet = seeking.index(title)
await self.do_checkin(
data.event.broadcaster_user_id,
data.event.broadcaster_user_login,
data.event.user_id,
data.event.user_name,
quiet,
data.event.redeemed_at
)
async def do_checkin(self, channel, channel_name, user, user_name, quiet, redeemed_at):
logger.info(
f"Starting checkin process for {channel_name=}, {user_name=}, {quiet=}, {redeemed_at=}"
)
checkin_counter_name = '_checkin'
first_counter_name = '_first'
second_counter_name = '_second'
third_counter_name = '_third'
counters = self.bot.get_cog('CounterCog')
if not counters:
raise ValueError("Check-in running without counters cog loaded!")
profiles = self.bot.get_cog('ProfileCog')
if not profiles:
raise ValueError("Check-in running without profile cog loaded!")
# TODO: Relies on profile implementation detail
profile = await profiles.fetch_profile_twitch(discord.Object(id=user))
stream_start = await self.get_stream_start(channel)
# Stream has to be running for this to do anything
if stream_start is not None:
# Get all check-in redeems since the start of stream.
check_in_counter = await counters.fetch_counter(checkin_counter_name)
entries = await counters.data.CounterEntry.table.select_where(
counters.data.CounterEntry.created_at >= stream_start,
counterid=check_in_counter.counterid,
)
position = len(entries) + 1
if profile.profileid not in (e['userid'] for e in entries):
# User has not already checked in!
# Check them in
# TODO: May be worth setting custom counter time
await counters.add_to_counter(
counter=check_in_counter.name,
userid=profile.profileid,
value=1,
)
checkin_total = await counters.personal_total(checkin_counter_name, profile.profileid)
# If they deserve a first, give them that
position_total = None
if position <= 3:
counter_name = (first_counter_name, second_counter_name, third_counter_name)[position-1]
await counters.add_to_counter(
counter=counter_name,
userid=profile.profileid,
value=1,
)
position_total = await counters.personal_total(counter_name, profile.profileid)
if not quiet:
name = user_name
if position == 1:
message = f"Welcome in and congrats on first check-in {name}! You have been first {position_total}/{checkin_total} times!"
else:
# TODO: Randomised replies
# TODO: Maybe different messages for lower positions or earlier times but not explicitly giving numbers?
# Need to update this for stream calcs anyway.
message = f"Welcome in {name}! You have checked in {checkin_total} times! Let's have a productive time together~"
# Now get the channel and post
channel = self.crocbot.get_channel(channel_name)
if not channel:
logger.error(
f"Channel {channel_name} is not in cache. Cannot send checkin reply."
)
else:
await channel.send(message)
async def get_stream_start(self, channelid: str | int) -> Optional[datetime]:
future = asyncio.run_coroutine_threadsafe(self._get_stream_start(channelid), self._loop)
return future.result()
async def _get_stream_start(self, channelid: str | int) -> Optional[datetime]:
streams = await self.crocbot.fetch_streams(user_ids=[int(channelid)])
if streams:
return streams[0].started_at

View File

@@ -3,8 +3,6 @@ from enum import Enum
from typing import Optional
from datetime import timedelta
from data.base import RawExpr
from data.columns import Column
import discord
from discord.ext import commands as cmds
from discord import app_commands as appcmds
@@ -17,10 +15,9 @@ from data.queries import ORDER
from meta import LionCog, LionBot, CrocBot, LionContext
from modules.profiles.community import Community
from modules.profiles.profile import UserProfile
from utils.lib import utc_now, paginate_list, pager
from utils.lib import utc_now
from . import logger
from .data import CounterData
from .graphics.weekly import counter_weekly_card, counter_monthly_card
class PERIOD(Enum):
@@ -95,7 +92,7 @@ def counter_cmd_factory(
community: Community,
args: Optional[str]
):
await cog.show_lb(ctx, counter, args, author, community, origin)
await ctx.reply(await cog.formatted_lb(counter, args, community, origin))
async def undo_cmd(
cog,
@@ -137,10 +134,9 @@ class CounterCog(LionCog):
async def cog_load(self):
self._load_twitch_methods(self.crocbot)
await self.load_counter_commands()
await self.data.init()
await self.load_counter_commands()
await self.load_counters()
self.loaded.set()
@@ -167,8 +163,7 @@ class CounterCog(LionCog):
)
disc_cmds.append(
cmds.hybrid_command(
name=row.name,
with_app_command=False,
name=row.name
)(self.discord_callback(counter_cb))
)
@@ -180,20 +175,18 @@ class CounterCog(LionCog):
)
disc_cmds.append(
cmds.hybrid_command(
name=row.lbname,
with_app_command=False,
name=row.lbname
)(self.discord_callback(lb_cb))
)
if row.undoname:
twitch_cmds.append(
commands.command(
name=row.undoname,
name=row.undoname
)(self.twitch_callback(undo_cb))
)
disc_cmds.append(
cmds.hybrid_command(
name=row.undoname,
with_app_command=False,
name=row.undoname
)(self.discord_callback(undo_cb))
)
@@ -201,6 +194,7 @@ class CounterCog(LionCog):
self.add_twitch_command(self.crocbot, cmd)
for cmd in disc_cmds:
# cmd.cog = self
self.bot.remove_command(cmd.name)
self.bot.add_command(cmd)
print(f"Adding command: {cmd}")
@@ -375,51 +369,6 @@ class CounterCog(LionCog):
await ctx.reply("Counter userid->profileid migration done.")
# Counters commands
@commands.command()
async def counterslb(self, ctx: commands.Context, *, periodstr: Optional[str] = None):
"""
Build a leaderboard of counter totals in the given period.
"""
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_twitch(ctx.author)
userid = author.profileid
community = await profiles.fetch_community_twitch(await ctx.channel.user())
period, start_time = await self.parse_period(community, periodstr or '')
query = self.data.CounterEntry.table.select_where()
query.group_by('counterid')
query.select('counterid', counter_total='SUM(value)')
query.order_by('counter_total', ORDER.DESC)
# query.where(Column('counter_total') > 0)
if start_time is not None:
query.where(self.data.CounterEntry.created_at >= start_time)
query.with_no_adapter()
results = await query
query.where(self.data.CounterEntry.userid == userid)
user_results = await query
lb = {result['counterid']: result['counter_total'] for result in results}
userlb = {result['counterid']: result['counter_total'] for result in user_results}
counters = await self.data.Counter.fetch_where(counterid=list(lb.keys()))
cmap = {c.counterid: c for c in counters}
parts = []
for cid, ctotal in lb.items():
if not ctotal:
continue
counter = cmap[cid]
user_total = userlb.get(cid) or 0
parts.append(f"{counter.name}: {ctotal}")
prefix = 'top 10 ' if len(parts) > 10 else ''
parts = parts[:10]
lbstr = '; '.join(parts)
await ctx.reply(f"Counters {period.value[-1]} {prefix}leaderboard -- {lbstr}")
@commands.command()
async def counter(self, ctx: commands.Context, name: str, subcmd: Optional[str], *, args: Optional[str]=None):
if not (ctx.author.is_mod or ctx.author.is_broadcaster):
@@ -453,7 +402,8 @@ class CounterCog(LionCog):
total = await self.totals(name)
await ctx.reply(f"'{name}' counter is now: {total}")
elif subcmd == 'lb':
await self.show_lb(ctx, name, args or '', author, community, origin=ORIGIN.TWITCH)
lbstr = await self.formatted_lb(name, args or '', community)
await ctx.reply(lbstr)
elif subcmd == 'clear':
await self.reset_counter(name)
await ctx.reply(f"'{name}' counter reset.")
@@ -534,71 +484,24 @@ class CounterCog(LionCog):
return (period, start_time)
@cmds.hybrid_command(
name='counterlb',
description="Show the leaderboard for the given counter."
)
async def counterlb_dcmd(self, ctx: LionContext, counter: str, period: Optional[str] = None):
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_discord(ctx.author)
community = await profiles.fetch_community_discord(ctx.guild)
await self.show_lb(ctx, counter, period, author, community, ORIGIN.DISCORD)
@cmds.hybrid_command(
name='counterstats',
description="Show your stats for the given counter."
)
async def counterstats_dcmd(self, ctx: LionContext, counter: str, period: Optional[str]=None):
profiles = self.bot.get_cog('ProfileCog')
author = await profiles.fetch_profile_discord(ctx.author)
community = await profiles.fetch_community_discord(ctx.guild)
if period and period.lower() in ('monthly', 'month'):
card = await counter_monthly_card(
self.bot,
userid=ctx.author.id,
profile=author,
counter=await self.fetch_counter(counter),
guildid=ctx.guild.id,
offset=0,
)
await card.render()
await ctx.reply(file=card.as_file('stats.png'))
else:
card = await counter_weekly_card(
self.bot,
userid=ctx.author.id,
profile=author,
counter=await self.fetch_counter(counter),
guildid=ctx.guild.id,
offset=0,
)
await card.render()
await ctx.reply(file=card.as_file('stats.png'))
async def show_lb(
async def formatted_lb(
self,
ctx: commands.Context | LionContext,
counter: str,
periodstr: str,
caller: UserProfile,
community: Community,
origin: ORIGIN = ORIGIN.TWITCH
):
period, start_time = await self.parse_period(community, periodstr)
lb = await self.leaderboard(counter, start_time=start_time)
name_map = {}
for userid in lb.keys():
profile = await UserProfile.fetch(self.bot, userid)
name = await profile.get_name()
name_map[userid] = name
if not lb:
await ctx.reply(
f"{counter} {period.value[-1]} leaderboard is empty!"
)
elif origin is ORIGIN.TWITCH:
lb = await self.leaderboard(counter, start_time=start_time)
if lb:
name_map = {}
for userid in lb.keys():
profile = await UserProfile.fetch(self.bot, userid)
name = await profile.get_name()
name_map[userid] = name
parts = []
items = list(lb.items())
prefix = 'top 10 ' if len(items) > 10 else ''
@@ -608,30 +511,6 @@ class CounterCog(LionCog):
part = f"{name}: {total}"
parts.append(part)
lbstr = '; '.join(parts)
await ctx.reply(f"{counter} {period.value[-1]} {prefix}leaderboard --- {lbstr}")
elif origin is ORIGIN.DISCORD:
title = f"'{counter}' {period.value[-1]} leaderboard"
lb_strings = []
author_index = None
max_name_len = min((30, max(len(name) for name in name_map.values())))
for i, (uid, total) in enumerate(lb.items()):
if author_index is None and uid == caller.profileid:
author_index = i
lb_strings.append(
"{:<{}}\t{:<9}".format(
name_map[uid],
max_name_len,
total,
)
)
page_len = 20
pages = paginate_list(lb_strings, block_length=page_len, title=title)
start_page = author_index // page_len if author_index is not None else 0
await pager(
ctx,
pages,
start_at=start_page
)
return f"{counter} {period.value[-1]} {prefix}leaderboard --- {lbstr}"
else:
return f"{counter} {period.value[-1]} leaderboard is empty!"

View File

@@ -1,222 +0,0 @@
import itertools
from typing import Optional
from datetime import timedelta, datetime
import calendar
from meta import LionBot
from gui.cards import WeeklyStatsCard, MonthlyStatsCard
from gui.base import CardMode
from modules.profiles.profile import UserProfile
from babel import LocalBabel
from modules.statistics.lib import apply_month_offset
from ..data import CounterData
babel = LocalBabel('counters')
_ = babel._
async def counter_monthly_card(
bot: LionBot,
userid: int,
profile: UserProfile,
counter: CounterData.Counter,
guildid: int,
offset: int,
):
cog = bot.get_cog('CounterCog')
data: CounterData = cog.data
if guildid:
lion = await bot.core.lions.fetch_member(guildid, userid)
user = await lion.fetch_member()
else:
lion = await bot.core.lions.fetch_user(userid)
user = await bot.fetch_user(userid)
today = lion.today
month_start = today.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
target = apply_month_offset(month_start, offset)
target_end = (target + timedelta(days=40)).replace(day=1, hour=0, minute=0) - timedelta(days=1)
months = [target]
for i in range(0, 3):
months.append((months[-1] - timedelta(days=1)).replace(day=1))
months.reverse()
rows = await data.CounterEntry.fetch_where(
data.CounterEntry.counterid == counter.counterid,
data.CounterEntry.userid == profile.profileid,
data.CounterEntry.created_at <= target_end,
data.CounterEntry.created_at >= months[0],
)
events = [(row.created_at, row.value) for row in rows]
month_lengths = [
(calendar.monthrange(month.year, month.month)[1]) for month in months
]
month_dates = []
for month, length in zip(months, month_lengths):
for day in range(1, length + 1):
month_dates.append(datetime(month.year, month.month, day, tzinfo=month.tzinfo))
monthly_flat = events_to_dayfreq(events, month_dates)
print(monthly_flat)
monthly = []
i = 0
for length in month_lengths:
this_month = monthly_flat[i : i+length]
i += length
monthly.append(this_month)
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
guildid, userid, MonthlyStatsCard.card_id
)
skin |= {
'title_text': f"{counter.name.upper()}",
'this_month_text': f"THIS MONTH: {{amount}} {counter.name.upper()}",
'last_month_text': f"LAST MONTH: {{amount}} {counter.name.upper()}"
}
if user:
username = (user.display_name, '')
else:
username = (await profile.get_name(), '')
card = MonthlyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
month=int(target.timestamp()),
monthly=monthly,
current_streak=-1,
longest_streak=-1,
skin=skin | {'mode': CardMode.TEXT}
)
return card
async def counter_weekly_card(
bot: LionBot,
userid: int,
profile: UserProfile,
counter: CounterData.Counter,
guildid: int,
offset: int,
):
cog = bot.get_cog('CounterCog')
data: CounterData = cog.data
if guildid:
lion = await bot.core.lions.fetch_member(guildid, userid)
user = await lion.fetch_member()
else:
lion = await bot.core.lions.fetch_user(userid)
user = await bot.fetch_user(userid)
today = lion.today
week_start = today - timedelta(days=today.weekday()) - timedelta(weeks=offset)
days = [week_start + timedelta(i) for i in range(-7, 8 if offset else (today.weekday() + 2))]
rows = await data.CounterEntry.fetch_where(
data.CounterEntry.counterid == counter.counterid,
data.CounterEntry.userid == profile.profileid,
data.CounterEntry.created_at <= days[-1],
data.CounterEntry.created_at >= days[0],
)
events = [(row.created_at, row.value) for row in rows]
daily = events_to_dayfreq(events, days)
sessions = events_to_sessions(next(zip(*events), []))
skin = await bot.get_cog('CustomSkinCog').get_skinargs_for(
guildid, userid, WeeklyStatsCard.card_id
)
skin |= {
'title_text': f"{counter.name.upper()}",
'this_week_text': f"THIS WEEK: {{amount}} {counter.name.upper()}",
'last_week_text': f"LAST WEEK: {{amount}} {counter.name.upper()}"
}
if user:
username = (user.display_name, '')
else:
username = (await profile.get_name(), '')
card = WeeklyStatsCard(
user=username,
timezone=str(lion.timezone),
now=lion.now.timestamp(),
week=week_start.timestamp(),
daily=tuple(map(int, daily)),
sessions=sessions,
skin=skin | {'mode': CardMode.TEXT}
)
return card
def events_to_dayfreq(events: list[tuple[datetime, int]], days: list[datetime]) -> list[int]:
if not days:
return []
last_day = 0
dayts = 0
daymap = {}
for day in sorted(days, reverse=True):
dayts = day.timestamp()
last_day = last_day or (day + timedelta(days=1)).timestamp()
daymap[dayts] = 0
first_day = dayts
for tim, count in events:
timts = tim.timestamp()
if not first_day < timts < last_day:
continue
for day_start in daymap:
if timts > day_start:
daymap[day_start] += count
break
return list(reversed(daymap.values()))
def events_to_sessions(event_times: list[datetime]) -> list[tuple[int, int]]:
"""
Convert a provided list of event times to a session list.
"""
sessions = []
session_start = None
session_end = None
SESSION_GAP = 60 * 30
SESSION_RADIUS = 60 * 30
for time in sorted(event_times):
if session_start and session_end and (time - session_end).total_seconds() - SESSION_RADIUS > SESSION_GAP:
session = (int(session_start.timestamp()), int(session_end.timestamp()))
sessions.append(session)
session_start = None
session_end = None
if session_start is None:
session_start = time - timedelta(seconds=SESSION_RADIUS)
session_end = time + timedelta(seconds=SESSION_RADIUS)
if session_start and session_end:
session = (int(session_start.timestamp()), int(session_end.timestamp()))
sessions.append(session)
return sessions

View File

@@ -1,8 +0,0 @@
import logging
logger = logging.getLogger(__name__)
from .cog import HyperFocusCog
async def setup(bot):
await bot.add_cog(HyperFocusCog(bot))

View File

@@ -1,306 +0,0 @@
import asyncio
import json
from typing import Optional
from dataclasses import dataclass
import twitchio
from twitchio.ext import commands
from twitchAPI.type import AuthScope
import random
import datetime as dt
from datetime import timedelta, datetime
from meta import CrocBot, LionCog, LionContext, LionBot
from meta.sockets import Channel, register_channel
from utils.lib import strfdelta, utc_now
from . import logger
@dataclass
class FocusState:
userid: int | str
name: str
focus_ends: datetime
hyper: bool = True
class FocusChannel(Channel):
name = 'FocusList'
def __init__(self, cog: 'HyperFocusCog', **kwargs):
self.cog = cog
super().__init__(**kwargs)
async def on_connection(self, websocket, event):
await super().on_connection(websocket, event)
await self.reload_focus(websocket=websocket)
def focus_args(self, state: FocusState):
return (
state.userid,
state.name,
state.hyper,
state.focus_ends.isoformat(),
)
async def reload_focus(self, websocket=None):
"""
Clear tasklist and re-send current tasks.
"""
await self.send_clear(websocket=websocket)
for state in self.cog.hyperfocusing.values():
await self.send_set(*self.focus_args(state), websocket=websocket)
async def send_set(self, userid, name, hyper, end_at, websocket=None):
await self.send_event({
'type': "DO",
'method': "setFocus",
'args': {
'userid': userid,
'name': name,
'hyper': hyper,
'end_at': end_at,
}
}, websocket=websocket)
async def send_del(self, userid, websocket=None):
await self.send_event({
'type': "DO",
'method': "delFocus",
'args': {
'userid': userid,
}
}, websocket=websocket)
async def send_clear(self, websocket=None):
await self.send_event({
'type': "DO",
'method': "clearFocus",
'args': {
}
}, websocket=websocket)
class HyperFocusCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.crocbot: CrocBot = bot.crocbot
# userid -> timestamp when they stop
self.hyperfocusing: dict[str, FocusState] = {}
self.channel = FocusChannel(self)
register_channel(self.channel.name, self.channel)
self.loaded = asyncio.Event()
async def cog_load(self):
self._load_twitch_methods(self.crocbot)
self.load_hyperfocus()
self.loaded.set()
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
def save_hyperfocus(self):
with open('hyperfocus.json', 'w', encoding='utf-8') as f:
mapped = {
userid: {
'userid': str(state.userid),
'name': state.name,
'focus_ends': state.focus_ends.isoformat(),
'hyper': state.hyper
}
for userid, state in self.hyperfocusing.items()
}
json.dump(mapped, f, ensure_ascii=False, indent=4)
def load_hyperfocus(self):
with open('hyperfocus.json') as f:
mapped = json.load(f)
self.hyperfocusing.clear()
for userid, map in mapped.items():
self.hyperfocusing[str(userid)] = FocusState(
userid=str(map['userid']),
name=map['name'],
hyper=map['hyper'],
focus_ends=dt.datetime.fromisoformat(map['focus_ends'])
)
print(f"Loaded hyperfocus: {self.hyperfocusing}")
def check_hyperfocus(self, userid):
"""
Returns whether a user is currently in HYPERFOCUS mode!
"""
return (state := self.hyperfocusing.get(userid, None)) and utc_now() < state.focus_ends
@commands.Cog.event('event_message')
async def on_message(self, message: twitchio.Message):
if message.content and message.content.lower() == 'nice':
await message.channel.send("That's Nice")
await self.good_croccy_handler(message)
tags = message.tags
if tags and message.content and self.check_hyperfocus(tags.get('user-id')):
if not self.valid_focus_message(message):
logger.info(
f"Deleting message from hyperfocused user. {message.raw_data=}"
)
await asyncio.sleep(1)
msgid = tags['id']
# TODO: Better selection for moderator
# i.e. if the message is not from the broadcaster and we do have delete perms
# then use our own token.
broadcasterid = tags['room-id']
authcog = self.bot.get_cog('TwitchAuthCog')
if not await authcog.check_auth(broadcasterid, scopes=[AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES]):
await message.channel.send(f"@{message.author.name} Stay focused! (I tried to delete your message because you are in !hyperfocus. Unfortunately I don't have the permissions to do that. But stay focused anyway!)")
else:
twitch = await authcog.fetch_client_for(broadcasterid)
await twitch.delete_chat_message(
broadcasterid,
broadcasterid,
msgid,
)
await message.channel.send(
f"@{message.author.name} Stay focused! (I deleted your message because you are in !hyperfocus, use !unfocus to come back.)"
)
async def good_croccy_handler(self, message: twitchio.Message):
if not message.content:
return
cleaned = message.content.lower().replace('@croccyhelper', '').strip()
if cleaned in ('good croc', 'good croccy', 'good helper'):
await message.channel.send("holono1Heart")
elif cleaned in ('bad croc', 'bad croccy', 'bad helper'):
await message.channel.send("holono1Sad")
async def chemical_handler(self, message: twitchio.Message):
if not message.content:
return
cleaned = message.content.lower().strip()
if cleaned in ('oh',):
await message.channel.send('Oxygen Hydrogen!')
def valid_focus_message(self, message: twitchio.Message) -> bool:
"""
Determined whether the given message is allowed to be sent in !hyperfocus.
That is, if it appears to be emote-only or a command.
"""
content = message.content
if not content:
return True
tags = message.tags or {}
to_remove = []
if (replying := tags.get('reply-parent-user-login', '')) and content.startswith('@'):
# Trim the mention from the start of the content
splits = content.split(maxsplit=1)
to_remove.append((0, len(splits[0])))
if emotesstr := tags.get('emotes', ''):
for emotestr in emotesstr.split('/'):
emote, locs = emotestr.split(':')
for loc in locs.split(','):
start, end = loc.split('-')
to_remove.append((int(start), int(end) + 1))
# Sort the pairs to remove by descending starting index
# This should allow clean removal with a loop as long as there are no intersections.
to_remove.sort(key=lambda pair: pair[0], reverse=True)
for start, end in to_remove:
content = content[:start] + content[end:]
content = content.strip().replace(' ', '').replace('\n', '')
allowed = not content or content.startswith('!') or content.startswith('*')
allowed = allowed or all(not char.isascii() for char in content)
if not allowed:
logger.info(f"Invalid hyperfocus message. Trimmed content: {content}")
return allowed
@commands.command(name='coinflip')
async def coinflip(self, ctx):
await ctx.reply(random.choice(('heads', 'tails')))
@commands.command(name='choose')
async def choose(self, ctx, *, args: str):
if not args:
await ctx.reply("Give me something to choose, e.g. !choose Heads | Tails")
else:
options = args.split('|')
options = [option.strip() for option in options]
options = [option for option in options if option]
choice = random.choice(options)
if random.random() < 0.01:
choice = "You"
await ctx.reply(f"I choose: {choice}")
@commands.command(name='hyperfocus')
async def hyperfocus_cmd(self, ctx, dur: Optional[int] = None):
userid = str(ctx.author.id)
now = utc_now()
end_time = None
if dur is None:
# Automatically select time
next_hour = now.replace(minute=0, second=0, microsecond=0) + dt.timedelta(hours=1)
next_block = next_hour - dt.timedelta(minutes=10)
if now > next_block:
# Currently in the break
next_block = next_block + dt.timedelta(hours=1)
end_time = next_block
dur = int((end_time - now).total_seconds() // 60)
elif dur > 720:
await ctx.reply("You can hyperfocus for at most 12 hours at a time!")
else:
end_time = utc_now() + dt.timedelta(minutes=dur)
if end_time is not None:
state = self.hyperfocusing[userid] = FocusState(
userid=userid,
name=ctx.author.display_name,
focus_ends=end_time,
)
self.save_hyperfocus()
await self.channel.send_set(*self.channel.focus_args(state))
await ctx.reply(
f"{ctx.author.name} has gone into HYPERFOCUS mode! "
f"They will be in emote and command only mode for the next {dur} minutes! "
"Use !unfocus if you really need to chat before then, best of luck! 🍀"
)
@commands.command(name='unfocus')
async def unfocus_cmd(self, ctx):
self.hyperfocusing.pop(ctx.author.id, None)
self.save_hyperfocus()
await self.channel.send_del(ctx.author.id)
await ctx.reply("Welcome back from focus, hope it went well! Have a comfy break and remember to have a sippie and a stretch~")
@commands.command(name='hyperfocused')
async def focused_cmd(self, ctx, user: Optional[twitchio.User] = None):
user = user if user is not None else ctx.author
userid = str(user.id)
if self.check_hyperfocus(userid):
state = self.hyperfocusing.get(userid)
end_time = state.focus_ends
durstr = strfdelta(end_time - utc_now())
await ctx.reply(
f"{user.name} is in HYPERFOCUS for another {durstr}! "
"They can only write emojis and commands in this time~ "
"(use !unfocus to come back if you need to!) "
"Good luck!"
)
elif userid != str(ctx.author.id):
await ctx.reply(
f"{user.name} is not hyperfocused!"
)
else:
await ctx.reply(
"You are not hyperfocused! "
"Enter HYPERFOCUS mode for e.g. 10 minutes by writing !hyperfocus 10"
)

View File

@@ -38,7 +38,7 @@ class UserProfile:
twitches = await self.twitch_accounts()
if twitches:
users = await self.bot.crocbot.fetch_users(
ids=[int(twitches[0].userid)]
ids=[int(twitch.userid) for twitch in twitches]
)
if users:
user = users[0]
@@ -86,21 +86,13 @@ class UserProfile:
"""
Fetch the Discord accounts associated to this profile.
"""
return await self.data.DiscordProfileRow.fetch_where(
profileid=self.profileid
).order_by(
'created_at'
)
return await self.data.DiscordProfileRow.fetch_where(profileid=self.profileid)
async def twitch_accounts(self) -> list[ProfileData.TwitchProfileRow]:
"""
Fetch the Twitch accounts associated to this profile.
"""
return await self.data.TwitchProfileRow.fetch_where(
profileid=self.profileid
).order_by(
'created_at'
)
return await self.data.TwitchProfileRow.fetch_where(profileid=self.profileid)
@classmethod
async def fetch(cls, bot: LionBot, profile_id: int) -> Self:

View File

@@ -8,16 +8,9 @@ async def get_leaderboard_card(
bot: LionBot, highlightid: int, guildid: int,
mode: CardMode,
entry_data: list[tuple[int, int, int]], # userid, position, time
name_map: dict[int, str] = {},
extra_skin_args = {},
):
"""
Render a leaderboard card with given parameters.
Parameters
----------
name_map: dict[int, str]
Map of userid -> name, used first before cache or fetch.
"""
guild = bot.get_guild(guildid)
if guild is None:
@@ -27,12 +20,8 @@ async def get_leaderboard_card(
avatars = {}
names = {}
missing = []
for userid, _, _ in entry_data:
if (name := name_map.get(userid, None)):
avatars[userid] = None
names[userid] = name
elif guild and (member := guild.get_member(userid)):
if guild and (member := guild.get_member(userid)):
avatars[userid] = member.avatar.key if member.avatar else None
names[userid] = member.display_name
elif (user := bot.get_user(userid)):
@@ -76,7 +65,7 @@ async def get_leaderboard_card(
guildid, None, LeaderboardCard.card_id
)
card = LeaderboardCard(
skin=skin | {'mode': mode} | extra_skin_args,
skin=skin | {'mode': mode},
server_name=guild.name,
entries=entries,
highlight=highlight

View File

@@ -1,8 +0,0 @@
import logging
logger = logging.getLogger(__name__)
from .cog import TimeCog
async def setup(bot):
await bot.add_cog(TimeCog(bot))

View File

@@ -1,110 +0,0 @@
import datetime as dt
import twitchio
from twitchio.ext import commands
from meta import CrocBot, LionCog, LionContext, LionBot
from utils.lib import strfdelta, utc_now, parse_dur
from . import logger
class TimeCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.crocbot: CrocBot = bot.crocbot
async def cog_load(self):
self._load_twitch_methods(self.crocbot)
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
async def get_timezone_for(self, profile):
timezone = None
discords = await profile.discord_accounts()
if discords:
userid = discords[0].userid
luser = await self.bot.core.lions.fetch_user(userid)
if luser:
timezone = luser.config.timezone.value
return timezone
def get_timestr(self, tz, brief=False):
"""
Get the current time in the given timezone, using a fixed format string.
"""
format_str = "%H:%M, %d/%m/%Y" if brief else "%I:%M %p (%Z) on %a, %d/%m/%Y"
now = dt.datetime.now(tz=tz)
return now.strftime(format_str)
async def time_diff(self, tz, auth_tz, name, brief=False):
"""
Get a string representing the time difference between the user's timezone and the given one.
"""
if auth_tz is None or tz is None:
return None
author_time = dt.datetime.now(tz=auth_tz)
other_time = dt.datetime.now(tz=tz)
timediff = other_time.replace(tzinfo=None) - author_time.replace(tzinfo=None)
diffsecs = round(timediff.total_seconds())
if diffsecs == 0:
return ", the same as {}!".format(name)
modifier = "behind" if diffsecs > 0 else "ahead"
diffsecs = abs(diffsecs)
hours, remainder = divmod(diffsecs, 3600)
mins, _ = divmod(remainder, 60)
hourstr = "{} hour{} ".format(hours, "s" if hours > 1 else "") if hours else ""
minstr = "{} minutes ".format(mins) if mins else ""
joiner = "and " if (hourstr and minstr) else ""
return ". {} is {}{}{}{}, at {}.".format(
name, hourstr, joiner, minstr, modifier, self.get_timestr(auth_tz, brief=brief)
)
@commands.command(name='time', aliases=['ti'])
async def time_cmd(self, ctx, *, args: str=''):
"""
Current usage is
!time
!time <target user>
Planned:
!time set ...
!time at ...
"""
authprofile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
authtz = await self.get_timezone_for(authprofile)
if args:
target_tw = await self.crocbot.seek_user(args)
if target_tw is None:
return await ctx.reply(f"Couldn't find user '{args}'!")
target = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(target_tw)
targettz = await self.get_timezone_for(target)
name = await target.get_name()
if targettz is None:
return await ctx.reply(
f"{name} hasn't set their timezone! Ask them to set it with '/my timezone' on discord."
)
else:
target = None
targettz = None
name = None
if authtz is None:
return await ctx.reply(
"You haven't set your timezone! Set it on discord by linking your Twitch account with `/profiles link twitch`, and then using `/my timezone`"
)
timestr = self.get_timestr(targettz if target else authtz)
name = name or await authprofile.get_name()
if target:
tdiffstr = await self.time_diff(targettz, authtz, await authprofile.get_name())
msg = f"The current time for {name} is {timestr}{tdiffstr}"
else:
msg = f"The current time for {name} is {timestr}"
await ctx.reply(msg)

View File

@@ -1,8 +0,0 @@
import logging
logger = logging.getLogger(__name__)
from .cog import ReminderCog
async def setup(bot):
await bot.add_cog(ReminderCog(bot))

View File

@@ -1,369 +0,0 @@
import asyncio
import json
import re
import itertools
from typing import Optional
from dataclasses import dataclass
from collections import defaultdict
from dateutil.parser import ParserError, parse
import twitchio
from twitchio.ext import commands
import datetime as dt
from datetime import timedelta, datetime
from meta import CrocBot, LionCog, LionContext, LionBot
from utils.lib import strfdelta, utc_now, parse_dur
from . import logger
reminder_regex = re.compile(
r"""
(^)?(?P<type> (?: \b in) | (?: every) | (?P<at> at))
\s*
(?(at) (?P<time> \d?\d (?: :\d\d)?\s*(?: am | pm)?) | (?P<duration> (?: day| hour| (?:\d+\s*(?:(?:d|h|m|s)[a-zA-Z]*)?(?:\s|and)*)+)))
(?:(?(1) (?:, | ; | : | \. | to)?\s+ | $ ))
""",
re.IGNORECASE | re.VERBOSE | re.DOTALL
)
@dataclass
class Reminder:
userid: int
content: str
name: str
channel: str
remind_at: datetime
class ReminderCog(LionCog):
def __init__(self, bot: LionBot):
self.bot = bot
self.crocbot: CrocBot = bot.crocbot
self.loaded = asyncio.Event()
self.reminders: dict[int, list[Reminder]] = defaultdict(list)
self.next_reminder_task = None
self._reminder_wait_task = None
self.reminder_lock = asyncio.Lock()
async def cog_load(self):
await self.load_reminders()
self._load_twitch_methods(self.crocbot)
self.loaded.set()
async def ensure_loaded(self):
if not self.loaded.is_set():
await self.cog_load()
async def cog_unload(self):
self._unload_twitch_methods(self.crocbot)
async def cog_check(self, ctx):
await self.ensure_loaded()
return True
def save_reminders(self):
with open('reminders.json', 'w', encoding='utf-8') as f:
mapped = {
int(userid): [
{
'userid': int(state.userid),
'name': state.name,
'channel': state.channel,
'content': state.content,
'remind_at': state.remind_at.isoformat(),
}
for state in states
]
for userid, states in self.reminders.items()
}
json.dump(mapped, f, ensure_ascii=False, indent=4)
async def load_reminders(self):
if self.next_reminder_task and not self.next_reminder_task.cancelled():
self.next_reminder_task.cancel()
self.next_reminder_task = None
with open('reminders.json') as f:
mapped = json.load(f)
self.reminders.clear()
for userid, states in mapped.items():
userid = int(userid)
for map in states:
reminder = Reminder(
userid=int(map['userid']),
content=map['content'],
name=map['name'],
channel=map['channel'],
remind_at=dt.datetime.fromisoformat(map['remind_at'])
)
self.reminders[userid].append(reminder)
self.schedule_next_reminder()
logger.info(f"Loaded reminders: {self.reminders}")
def schedule_next_reminder(self):
"""
Schedule the next reminder in the queue, if it exists, and return it.
Cancels any currently running task.
"""
if not self.reminders:
return None
next_reminder = min(
itertools.chain(*self.reminders.values()), key=lambda r: r.remind_at, default=None
)
if next_reminder:
self.next_reminder_task = asyncio.create_task(self.run_reminder(next_reminder))
else:
# We still need to cancel any ongoing reminders
if self._reminder_wait_task and not self._reminder_wait_task.cancelled():
self._reminder_wait_task.cancel()
async def run_reminder(self, reminder: Reminder):
"""
Wait for and then run the given reminder.
Expects to be cancelled if another reminder is scheduled earlier.
"""
# Cancel the next reminder wait task.
# If the next reminder is currently executing/firing,
# this will do nothing and we will wait until it is finished.
if self._reminder_wait_task and not self._reminder_wait_task.cancelled():
self._reminder_wait_task.cancel()
# This ensures that only one reminder task runs at once
async with self.reminder_lock:
now = utc_now()
to_wait = (reminder.remind_at - now).total_seconds()
try:
self._reminder_wait_task = asyncio.create_task(asyncio.sleep(to_wait))
await self._reminder_wait_task
except asyncio.CancelledError:
# Reminder task was cancelled
raise
# Now fire the reminder
await self.fire_reminder(reminder)
# And schedule the next reminder if needed
self.schedule_next_reminder()
async def fire_reminder(self, reminder: Reminder):
"""
Actually run the given reminder.
"""
# Check that this reminder is still valid
if reminder not in self.reminders[reminder.userid]:
logger.error(f"Reminder {reminder!r} is firing but not scheduled!")
return
# We don't want to reschedule while a reminder is running
# Get the channel to send to
destination = self.crocbot.get_channel(reminder.channel)
if destination is None:
logger.info(f"Reminder couldn't get channel '{reminder.channel}'. Trying again in a minute.")
# In case we aren't actually ready yet
await self.crocbot.wait_for_ready()
try:
await asyncio.sleep(60)
except asyncio.CancelledError:
logger.info("Cancelling channel wait task for reminder.")
raise
destination = self.crocbot.get_channel(reminder.channel)
if destination is None:
# This means we haven't joined the channel
logger.warning(f"Reminder couldn't get channel '{reminder.channel}' for the second time. Cancelling.")
else:
logger.info(f"Channel '{reminder.channel}' found as {destination}. Continuing.")
if destination is not None:
# Send the reminder
msg = f"@{reminder.name}, you asked me to remind you: {reminder.content}"
await destination.send(msg)
# This should really be based on a reminderid but oh well
# It's theoretically possible for a reminder to be scheduled at the same time as it is run
# In which case the wrong reminder will be removed.
self.reminders[reminder.userid].remove(reminder)
self.save_reminders()
def get_reminders_for(self, userid: int):
return self.reminders.get(userid, [])
@commands.command(name='remindme', aliases=['reminders', 'reminder'])
async def remindme_cmd(self, ctx, *, args: str=''):
args = args.strip()
userid = int(ctx.author.id)
existing = self.get_reminders_for(userid)
existing.sort(key=lambda r: r.remind_at, reverse=False)
now = utc_now()
if not args or args.lower() in ('show', 'list'):
# Show user's current reminders or show usage
if not existing:
await ctx.reply(
"USAGE: !remindme <task> in <dur> EG: !remindme Coffee is ready in 10m | !remindme in 10m, Coffee is ready"
)
elif len(existing) == 1:
reminder = existing[0]
dur = reminder.remind_at - now
sec = (dur.total_seconds()) < 60
formatted_dur = strfdelta(dur, short=False, sec=sec)
await ctx.reply(
f"I will remind you about '{reminder.content}' in about {formatted_dur}. Use !remindme cancel to cancel!"
)
else:
parts = []
for i, reminder in enumerate(existing, start=1):
dur = reminder.remind_at - now
sec = (dur.total_seconds()) < 60
formatted_dur = strfdelta(dur, short=True, sec=sec)
parts.append(
f"{i}: '{reminder.content}' in {formatted_dur}"
)
remstr = '; '.join(parts)
if len(remstr) > 290:
remstr = remstr[:290] + '...'
await ctx.reply(
f"Active Reminders: {remstr}. Use '!remindme cancel n' or '!remindme clear' to remove!"
)
elif args.lower() in ('clear', 'clearall', 'remove all'):
# Remove all reminders
if existing:
self.reminders.pop(userid, None)
self.save_reminders()
self.schedule_next_reminder()
else:
await ctx.reply("You don't have any reminders set!")
elif args.lower().split(maxsplit=1)[0] in ('remove', 'cancel'):
splits = args.split(maxsplit=1)
remaining = splits[1].strip() if len(splits) > 1 else ''
# Remove a specified reminder
to_remove = None
if not existing:
await ctx.reply("You don't have any reminders set!")
elif len(existing) == 1:
to_remove = existing[0]
elif remaining.isdigit():
# Try to the remove the reminder with the give number
given = int(remaining)
if given > len(existing):
await ctx.reply(f"You only have {len(existing)} reminders!")
else:
to_remove = existing[given - 1]
else:
# Invalid arguments, show usage
await ctx.reply(
"USAGE: !remindme cancel <number>, e.g. !remindme cancel 1 to cancel your first reminder!"
)
if to_remove is not None:
self.reminders[userid].remove(to_remove)
await ctx.reply(
f"Cancelled your reminder '{to_remove.content}'"
)
self.save_reminders()
self.schedule_next_reminder()
else:
# Parse for reminder
content = None
duration = None
repeating = None
# First parse it
match = re.search(reminder_regex, args)
if match:
typ = match.group('type').lower().strip()
content = (args[:match.start()] + args[match.end():]).strip()
if typ in ('every', 'in'):
repeating = typ == 'every'
duration_str = match.group('duration').lower()
if duration_str.isdigit():
# Default to minutes if no unit given
duration = int(duration_str) * 60
elif duration_str in ('day', 'a day'):
duration = 24 * 60 * 60
elif duration_str in ('hour', 'an hour'):
duration = 60 * 60
else:
duration = parse_dur(duration_str)
elif typ == 'at':
# Get timezone for this member.
profile = await self.bot.get_cog('ProfileCog').fetch_profile_twitch(ctx.author)
timezone = None
discords = await profile.discord_accounts()
if discords:
luserid = discords[0].userid
luser = await self.bot.core.lions.fetch_user(luserid)
if luser:
timezone = luser.config.timezone.value
if not timezone:
return await ctx.reply(
"Sorry, to use this you have to link your account with `/profiles link twitch` and set your timezone with '/my timezone' on the Discord!"
)
time_str = match.group('time').lower()
if time_str.isdigit():
# Assume it's an hour
time_str = time_str + ':00'
default = dt.datetime.now(tz=timezone).replace(hour=0, minute=0, second=0, microsecond=0)
try:
ts = parse(time_str, fuzzy=True, default=default)
except ParserError:
return await ctx.reply(
"Sorry, I didn't understand your target time! Please use e.g. !remindme Remember to hydrate at 10pm"
)
while ts < dt.datetime.now(tz=timezone):
ts += dt.timedelta(days=1)
duration = (ts - dt.datetime.now(tz=timezone)).total_seconds()
duration = int(duration)
if content.startswith('to '):
content = content[3:].strip()
else:
# Legacy parsing, without requiring "in" at the front
splits = args.split(maxsplit=1)
if len(splits) == 2 and splits[0].isdigit():
repeating = False
duration = int(splits[0]) * 60
content = splits[1].strip()
# Sanity checking
if not duration or not content:
return await ctx.reply(
"Sorry, I didn't understand your reminder! Please use e.g. !remindme Coffee is ready in 10m"
)
if repeating:
return await ctx.reply(
"Sorry, we don't support repeating reminders right now!"
)
if len(existing) > 10:
return await ctx.reply(
"Sorry, you can only have 10 active reminders! Use !remindme cancel or !remindme clear to cancel some!"
)
reminder = Reminder(
userid=userid,
content=content,
name=ctx.author.name,
channel=ctx.channel.name,
remind_at=now + timedelta(seconds=duration)
)
self.reminders[userid].append(reminder)
dur = reminder.remind_at - now
sec = (dur.total_seconds()) < 60
formatted_dur = strfdelta(dur, short=False, sec=sec)
msg = f"Got it! I will remind you in {formatted_dur}!"
await ctx.reply(msg)
self.save_reminders()
self.schedule_next_reminder()

View File

@@ -29,26 +29,13 @@ class TwitchAuthCog(LionCog):
self.bot = bot
self.data = bot.db.load_registry(TwitchAuthData())
self.client_cache = {}
async def cog_load(self):
await self.data.init()
# ----- Auth API -----
async def fetch_client_for(self, userid: str):
authrow = await self.data.UserAuthRow.fetch(userid)
if authrow is None:
# TODO: Some user authentication error
self.client_cache.pop(userid, None)
raise ValueError("Requested user is not authenticated.")
if (twitch := self.client_cache.get(userid)) is None:
twitch = await Twitch(self.bot.config.twitch['app_id'], self.bot.config.twitch['app_secret'])
scopes = await self.data.UserAuthRow.get_scopes_for(userid)
authscopes = [AuthScope(scope) for scope in scopes]
await twitch.set_user_authentication(authrow.access_token, authscopes, authrow.refresh_token)
self.client_cache[userid] = twitch
return twitch
async def fetch_client_for(self, userid: int):
...
async def check_auth(self, userid: str, scopes: list[AuthScope] = []) -> bool:
"""
@@ -59,9 +46,7 @@ class TwitchAuthCog(LionCog):
if authrow:
if scopes:
has_scopes = await self.data.UserAuthRow.get_scopes_for(userid)
desired = {scope.value for scope in scopes}
has_auth = desired.issubset(has_scopes)
logger.info(f"Auth check for `{userid}`: Requested scopes {desired}, has scopes {has_scopes}. Passed: {has_auth}")
has_auth = set(map(str, scopes)).issubset(has_scopes)
else:
has_auth = True
else:
@@ -73,7 +58,6 @@ class TwitchAuthCog(LionCog):
Start the user authentication flow for the given userid.
Will request the given scopes along with the default ones and any existing scopes.
"""
self.client_cache.pop(userid, None)
existing_strs = await self.data.UserAuthRow.get_scopes_for(userid)
existing = map(AuthScope, existing_strs)
to_request = set(existing).union(scopes)
@@ -98,17 +82,3 @@ class TwitchAuthCog(LionCog):
await ctx.reply(flow.auth.return_auth_url())
await flow.run()
await ctx.reply("Authentication Complete!")
@cmds.hybrid_command(name='modauth')
async def cmd_modauth(self, ctx: LionContext):
if ctx.interaction:
await ctx.interaction.response.defer(ephemeral=True)
scopes = [
AuthScope.MODERATOR_READ_FOLLOWERS,
AuthScope.CHANNEL_READ_REDEMPTIONS,
AuthScope.MODERATOR_MANAGE_CHAT_MESSAGES,
]
flow = await self.start_auth(scopes=scopes)
await ctx.reply(flow.auth.return_auth_url())
await flow.run()
await ctx.reply("Authentication Complete!")

View File

@@ -64,7 +64,7 @@ class TwitchAuthData(Registry):
"""
rows = await TwitchAuthData.user_scopes.select_where(userid=userid)
return [row['scope'] for row in rows] if rows else []
return [row.scope for row in rows] if rows else []
"""

View File

@@ -7,7 +7,6 @@ import iso8601 # type: ignore
import pytz
import re
import json
import asyncio
from contextvars import Context
import discord
@@ -342,9 +341,9 @@ def strfdelta(delta: datetime.timedelta, sec=False, minutes=True, short=False) -
return "".join(reply_msg)
def parse_dur(time_str: str) -> int:
def _parse_dur(time_str: str) -> int:
"""
Parses a user provided time duration string into an integer number of seconds.
Parses a user provided time duration string into a timedelta object.
Parameters
----------
@@ -919,126 +918,3 @@ def write_records(records: list[dict[str, Any]], stream: StringIO):
for record in records:
stream.write(','.join(map(str, record.values())))
stream.write('\n')
async def pager(ctx, pages, locked=True, start_at=0, add_cancel=False, **kwargs):
"""
Shows the user each page from the provided list `pages` one at a time,
providing reactions to page back and forth between pages.
This is done asynchronously, and returns after displaying the first page.
Parameters
----------
pages: List(Union(str, discord.Embed))
A list of either strings or embeds to display as the pages.
locked: bool
Whether only the `ctx.author` should be able to use the paging reactions.
kwargs: ...
Remaining keyword arguments are transparently passed to the reply context method.
Returns: discord.Message
This is the output message, returned for easy deletion.
"""
cancel_emoji = cross
# Handle broken input
if len(pages) == 0:
raise ValueError("Pager cannot page with no pages!")
# Post first page. Method depends on whether the page is an embed or not.
if isinstance(pages[start_at], discord.Embed):
out_msg = await ctx.reply(embed=pages[start_at], **kwargs)
else:
out_msg = await ctx.reply(pages[start_at], **kwargs)
# Run the paging loop if required
if len(pages) > 1:
task = asyncio.create_task(_pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs))
# ctx.tasks.append(task)
elif add_cancel:
await out_msg.add_reaction(cancel_emoji)
# Return the output message
return out_msg
async def _pager(ctx, out_msg, pages, locked, start_at, add_cancel, **kwargs):
"""
Asynchronous initialiser and loop for the `pager` utility above.
"""
# Page number
page = start_at
# Add reactions to the output message
next_emoji = ""
prev_emoji = ""
cancel_emoji = cross
try:
await out_msg.add_reaction(prev_emoji)
if add_cancel:
await out_msg.add_reaction(cancel_emoji)
await out_msg.add_reaction(next_emoji)
except discord.Forbidden:
# We don't have permission to add paging emojis
# Die as gracefully as we can
if ctx.guild:
perms = ctx.channel.permissions_for(ctx.guild.me)
if not perms.add_reactions:
await ctx.error_reply(
"Cannot page results because I do not have the `add_reactions` permission!"
)
elif not perms.read_message_history:
await ctx.error_reply(
"Cannot page results because I do not have the `read_message_history` permission!"
)
else:
await ctx.error_reply(
"Cannot page results due to insufficient permissions!"
)
else:
await ctx.error_reply(
"Cannot page results!"
)
return
# Check function to determine whether a reaction is valid
def check(reaction, user):
result = reaction.message.id == out_msg.id
result = result and str(reaction.emoji) in [next_emoji, prev_emoji]
result = result and not (user.id == ctx.bot.user.id)
result = result and not (locked and user != ctx.author)
return result
# Begin loop
while True:
# Wait for a valid reaction, break if we time out
try:
reaction, user = await ctx.bot.wait_for('reaction_add', check=check, timeout=300)
except asyncio.TimeoutError:
break
# Attempt to remove the user's reaction, silently ignore errors
asyncio.ensure_future(out_msg.remove_reaction(reaction.emoji, user))
# Change the page number
page += 1 if reaction.emoji == next_emoji else -1
page %= len(pages)
# Edit the message with the new page
active_page = pages[page]
if isinstance(active_page, discord.Embed):
await out_msg.edit(embed=active_page, **kwargs)
else:
await out_msg.edit(content=active_page, **kwargs)
# Clean up by removing the reactions
try:
await out_msg.clear_reactions()
except discord.Forbidden:
try:
await out_msg.remove_reaction(next_emoji, ctx.client.user)
await out_msg.remove_reaction(prev_emoji, ctx.client.user)
except discord.NotFound:
pass
except discord.NotFound:
pass