Merge branch 'feature-premium' into rewrite

This commit is contained in:
2023-10-26 13:50:38 +03:00
14 changed files with 1341 additions and 10 deletions

View File

@@ -17,7 +17,7 @@ invite_bot =
[ENDPOINTS]
guild_log =
gem_transaction =
gem_log =
[LOGGING]
log_file = bot.log

View File

@@ -20,6 +20,7 @@ active = [
'.meta',
'.sponsors',
'.topgg',
'.premium',
'.test',
]

View File

@@ -5,6 +5,7 @@ from typing import Optional
import discord
from core.lion_guild import LionGuild
from data.queries import ORDER
from meta import LionBot
from utils.lib import MessageArgs, jumpto, strfdelta, utc_now
from utils.monitor import TaskMonitor
@@ -86,7 +87,9 @@ class Ticket:
instantiate the correct classes.
"""
registry: ModerationData = bot.db.registries['ModerationData']
rows = await registry.Ticket.fetch_where(*args, **kwargs)
rows = await registry.Ticket.fetch_where(*args, **kwargs).order_by(
'created_at', ORDER.DESC,
)
tickets = []
if rows:
guildids = set(row.guildid for row in rows)

View File

@@ -0,0 +1,10 @@
import logging
from babel.translator import LocalBabel
babel = LocalBabel('premium')
logger = logging.getLogger(__name__)
async def setup(bot):
from .cog import PremiumCog
await bot.add_cog(PremiumCog(bot))

713
src/modules/premium/cog.py Normal file
View File

@@ -0,0 +1,713 @@
from typing import Optional
import asyncio
import discord
from discord.ext import commands as cmds
import discord.app_commands as appcmds
from discord.ui.button import Button, ButtonStyle
from discord.ui.text_input import TextInput, TextStyle
from meta import LionCog, LionBot, LionContext
from meta.errors import SafeCancellation, UserInputError
from meta.logger import log_wrap
from utils.lib import utc_now
from utils.ui import FastModal
from wards import sys_admin_ward
from constants import MAX_COINS
from . import logger, babel
from .data import PremiumData, GemTransactionType
from .ui.transactions import TransactionList
from .ui.premium import PremiumUI
from .errors import GemTransactionFailed, BalanceTooLow, BalanceTooHigh
_p = babel._p
class PremiumCog(LionCog):
buy_gems_link = "https://lionbot.org/donate"
def __init__(self, bot: LionBot):
self.bot = bot
self.data: PremiumData = bot.db.load_registry(PremiumData())
self.gem_logger: Optional[discord.Webhook] = None
async def cog_load(self):
await self.data.init()
if (leo_setting_cog := self.bot.get_cog('LeoSettings')) is not None:
self.crossload_group(self.leo_group, leo_setting_cog.leo_group)
if (gem_log_url := self.bot.config.endpoints.get('gem_log', None)) is not None:
self.gem_logger = discord.Webhook.from_url(gem_log_url, session=self.bot.web_client)
# ----- API -----
def buy_gems_button(self) -> Button:
t = self.bot.translator.t
button = Button(
style=ButtonStyle.link,
label=t(_p(
'button:gems|label',
"Buy Gems"
)),
emoji=self.bot.config.emojis.gem,
url=self.buy_gems_link,
)
return button
async def get_gem_balance(self, userid: int) -> int:
"""
Get the up-to-date gem balance for this user.
Creates the User row if it does not already exist.
"""
record = await self.bot.core.data.User.fetch(userid, cached=False)
if record is None:
record = await self.bot.core.data.User.create(userid=userid)
return record.gems
async def get_gift_count(self, userid: int) -> int:
"""
Compute the number of gifts this user has sent, by counting Transaction rows.
"""
record = await self.data.GemTransaction.table.select_where(
from_account=userid,
transaction_type=GemTransactionType.GIFT,
).select(
gift_count='COUNT(*)'
).with_no_adapter()
return record[0]['gift_count'] or 0
async def is_premium_guild(self, guildid: int) -> bool:
"""
Check whether the given guild currently has premium status.
"""
row = await self.data.PremiumGuild.fetch(guildid)
now = utc_now()
premium = (row is not None) and row.premium_until and (row.premium_until > now)
return premium
@log_wrap(isolate=True)
async def _add_gems(self, userid: int, amount: int):
"""
Transaction helper method to atomically add `amount` gems to account `userid`,
creating the account if required.
Do not use this method for a gem transaction. Use `gem_transaction` instead.
"""
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
model = self.bot.core.data.User
rows = await model.table.update_where(userid=userid).set(gems=model.gems + amount)
if not rows:
# User does not exist, create it
if amount < 0:
raise BalanceTooLow
if amount > MAX_COINS:
raise BalanceTooHigh
row = (await model.create(userid=userid, gems=amount)).data
else:
row = rows[0]
if row['gems'] < 0:
raise BalanceTooLow
async def gem_transaction(
self,
transaction_type: GemTransactionType,
*,
actorid: int,
from_account: Optional[int], to_account: Optional[int],
amount: int, description: str,
note: Optional[str] = None, reference: Optional[str] = None,
) -> PremiumData.GemTransaction:
"""
Perform a gem transaction with the given parameters.
This atomically creates a row in the 'gem_transactions' table,
updates the account balances,
and posts in the gem audit log.
Parameters
----------
transaction_type: GemTransactionType
The type of transaction.
actorid: int
The userid of the actor who initiated this transaction.
Automatic actions (e.g. webhook triggered) may have their own unique id.
from_account: Optional[int]
The userid of the source account.
May be `None` if there is no source account (e.g. manual modification by admin).
to_account: Optional[int]
The userid of the destination account.
May be `None` if there is no destination account.
amount: int
The number of LionGems to transfer.
description: str
An informative description of the transaction for auditing purposes.
Should include the pathway (e.g. command) through which the transaction was executed.
note: Optional[str]
Optional user-readable note added by the actor.
Usually attached in a notification visible by the target.
(E.g. thanks message from system/admin, or note attached to gift.)
reference: str
Optional admin-readable transaction reference.
This may be the message link of a command message,
or an external id/reference for an automatic transaction.
Raises
------
BalanceTooLow:
Raised if either source or target account would go below 0.
"""
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
if from_account is not None:
await self._add_gems(from_account, -amount)
if to_account is not None:
await self._add_gems(to_account, amount)
row = await self.data.GemTransaction.create(
transaction_type=transaction_type,
actorid=actorid,
from_account=from_account,
to_account=to_account,
amount=amount,
description=description,
note=note,
reference=reference,
)
logger.info(
f"LionGem Transaction performed. Transaction data: {row!r}"
)
await self.audit_log(row)
return row
async def audit_log(self, row: PremiumData.GemTransaction):
"""
Log the provided gem transaction to the global gem audit log.
If this fails, or the audit log does not exist, logs a warning.
"""
posted = False
if self.gem_logger is not None:
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Gem Transaction #{row.transactionid}",
timestamp=row._timestamp,
)
embed.add_field(name="Type", value=row.transaction_type.name)
embed.add_field(name="Amount", value=str(row.amount))
embed.add_field(name="Actor", value=f"<@{row.actorid}>")
embed.add_field(name="From Account", value=f"<@{row.from_account}>" if row.from_account else 'None')
embed.add_field(name="To Account", value=f"<@{row.to_account}>" if row.to_account else 'None')
embed.add_field(name='Description', value=str(row.description), inline=False)
if row.note:
embed.add_field(name='Note', value=str(row.note), inline=False)
if row.reference:
embed.add_field(name='Reference', value=str(row.reference), inline=False)
try:
await self.gem_logger.send(embed=embed)
posted = True
except discord.HTTPException:
pass
if not posted:
logger.warning(
f"Missed gem audit logging for gem transaction: {row!r}"
)
# ----- User Commands -----
@cmds.hybrid_command(
name=_p('cmd:free', "free"),
description=_p(
'cmd:free|desc',
"Get free LionGems!"
)
)
async def cmd_free(self, ctx: LionContext):
t = self.bot.translator.t
content = t(_p(
'cmd:free|embed|description',
"You can get free LionGems by sharing our project on your Discord server and social media!\n"
"If you have well-established, or YouTube, Instagram, and TikTok accounts,"
" we will reward you for creating videos and content about the bot.\n"
"If you have a big server, you can promote our project and get LionGems in return.\n"
"For more details, contact `arihoresh` or open a Ticket in the [support server](https://discord.gg/studylions)."
))
thumb = "https://cdn.discordapp.com/attachments/890619584158265405/972791204498530364/Untitled_design_44.png"
title = t(_p(
'cmd:free|embed|title',
"Get FREE LionGems!"
))
embed = discord.Embed(
title=title,
description=content,
colour=0x41f097
)
embed.set_thumbnail(url=thumb)
await ctx.reply(embed=embed)
@cmds.hybrid_command(
name=_p('cmd:gift', "gift"),
description=_p(
'cmd:gift|desc',
"Gift your LionGems to another user!"
)
)
@appcmds.rename(
user=_p('cmd:gift|param:user', "user"),
amount=_p('cmd:gift|param:amount', "amount"),
note=_p('cmd:gift|param:note', "note"),
)
@appcmds.describe(
user=_p(
'cmd:gift|param:user|desc',
"User to which you want to gift your LionGems."
),
amount=_p(
'cmd:gift|param:amount|desc',
"Number of LionGems to gift."
),
note=_p(
'cmd:gift|param:note|desc',
"Optional note to attach to your gift."
),
)
async def cmd_gift(self, ctx: LionContext,
user: discord.User,
amount: appcmds.Range[int, 1, MAX_COINS],
note: Optional[appcmds.Range[str, 1, 1024]] = None):
if not ctx.interaction:
return
t = self.bot.translator.t
# Validate target
if user.bot:
raise UserInputError(
t(_p(
'cmd:gift|error:target_bot',
"You cannot gift LionGems to bots!"
))
)
if user.id == ctx.author.id:
raise UserInputError(
t(_p(
'cmd:gift|error:target_is_author',
"You cannot gift LionGems to yourself!"
))
)
# Prepare and open gift confirmation modal
amount_field = TextInput(
label=t(_p(
'cmd:gift|modal:confirm|field:amount|label',
"Number of LionGems to Gift"
)),
default=str(amount),
required=True,
)
note_field = TextInput(
label=t(_p(
'cmd:gift|modal:confirm|field:note|label',
"Add an optional note to your gift"
)),
default=note or '',
required=False,
max_length=1024,
style=TextStyle.long,
)
modal = FastModal(
amount_field, note_field,
title=t(_p(
'cmd:gift|modal:confirm|title',
"Confirm LionGem Gift"
))
)
await ctx.interaction.response.send_modal(modal)
try:
interaction = await modal.wait_for(timeout=300)
except asyncio.TimeoutError:
# Presume user cancelled and wants to abort
raise SafeCancellation
await interaction.response.defer(thinking=False)
# Parse amount
amountstr = amount_field.value
if not amountstr.isdigit():
raise UserInputError(
t(_p(
'cmd:gift|error:parse_amount',
"Could not parse `{provided}` as a number!"
)).format(provided=amountstr)
)
amount = int(amountstr)
if amount == 0:
raise UserInputError(
t(_p(
'cmd:gift|error:amount_zero',
"Cannot gift `0` gems."
))
)
# Get author's balance, make sure they have enough
author_balance = await self.get_gem_balance(ctx.author.id)
if author_balance < amount:
raise UserInputError(
t(_p(
'cmd:gift|error:author_balance_too_low',
"Insufficient balance to send {gem}**{amount}**!\n"
"Current balance: {gem}**{balance}**"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
balance=author_balance,
)
)
# Everything seems to be in order, run the transaction
try:
transaction = await self.gem_transaction(
GemTransactionType.GIFT,
actorid=ctx.author.id,
from_account=ctx.author.id, to_account=user.id,
amount=amount,
description="Gift given through command '/gift'",
note=note_field.value or None
)
except BalanceTooLow:
raise UserInputError(
t(_p(
'cmd:gift|error:balance_too_low',
"Insufficient Balance to complete gift!"
))
)
# Attempt to send note to user
thumb = "https://cdn.discordapp.com/attachments/925799205954543636/938704034578194443/C85AF926-9F75-466F-9D8E-D47721427F5D.png"
icon = "https://cdn.discordapp.com/attachments/925799205954543636/938703943683416074/4CF1C849-D532-4DEC-B4C9-0AB11F443BAB.png"
desc = t(_p(
'cmd:gift|target_msg|desc',
"You were just gifted {gem}**{amount}** by {user}!\n"
"To use them, use the command {skin_cmd} to change your graphics skin!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
user=ctx.author.mention,
skin_cmd=self.bot.core.mention_cmd('my skin'),
)
embed = discord.Embed(
description=desc,
colour=discord.Colour.orange()
)
embed.set_thumbnail(url=thumb)
embed.set_author(
name=t(_p('cmd:gift|target_msg|author:name', "LionGems Delivery!")),
icon_url=icon,
)
embed.set_footer(
text=t(_p(
'cmd:gift|target_msg|footer:text',
"You now have {balance} LionGems"
)).format(
balance=await self.get_gem_balance(user.id),
)
)
embed.timestamp = utc_now()
note = note_field.value
if note:
embed.add_field(
name=t(_p(
'cmd:gift|target_msg|field:note|name',
"The sender attached a note"
)),
value=note
)
notify_sent = False
try:
await user.send(embed=embed)
notify_sent = True
except discord.HTTPException:
logger.info(
f"Could not send LionGem gift target their gift notification. Transaction {transaction.transactionid}"
)
# Finally, send the ack back to the author
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:gift|embed:success|title',
"Gift Sent!"
)),
description=t(_p(
'cmd:gift|embed:success|description',
"Your gift of {gem}**{amount}** is on its way to {target}!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
target=user.mention,
)
)
embed.set_footer(
text=t(_p(
'cmd:gift|embed:success|footer',
"New Balance: {balance} LionGems",
)).format(balance=await self.get_gem_balance(ctx.author.id))
)
if not notify_sent:
embed.add_field(
name="",
value=t(_p(
'cmd:gift|embed:success|field:notify_failed|value',
"Unfortunately, I couldn't tell them about it! "
"They might have direct messages with me turned off."
))
)
await ctx.reply(embed=embed, ephemeral=True)
@cmds.hybrid_command(
name=_p('cmd:premium', "premium"),
description=_p(
'cmd:premium|desc',
"Upgrade your server with LionGems!"
)
)
@appcmds.guild_only
async def cmd_premium(self, ctx: LionContext):
if not ctx.guild:
return
if not ctx.interaction:
return
ui = PremiumUI(self.bot, ctx.guild, ctx.luser, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
# ----- Owner Commands -----
@LionCog.placeholder_group
@cmds.hybrid_group("leo", with_app_command=False)
async def leo_group(self, ctx: LionContext):
...
@leo_group.command(
name=_p('cmd:leo_gems', "gems"),
description=_p(
'cmd:leo_gems|desc',
"View and adjust a user's LionGem balance."
)
)
@appcmds.rename(
target=_p('cmd:leo_gems|param:target', "target"),
adjustment=_p('cmd:leo_gems|param:adjustment', "adjustment"),
note=_p('cmd:leo_gems|param:note', "note"),
reason=_p('cmd:leo_gems|param:reason', "reason")
)
@appcmds.describe(
target=_p(
'cmd:leo_gems|param:target|desc',
"Target user you wish to view or modify LionGems for."
),
adjustment=_p(
'cmd:leo_gems|param:adjustment|desc',
"Number of LionGems to add to the target's balance (may be negative to remove)"
),
note=_p(
'cmd:leo_gems|param:note|desc',
"Optional note to attach to the delivery message when adding LionGems."
),
reason=_p(
'cmd:leo_gems|param:reason|desc',
'Optional reason or context to add to the gem audit log for this transaction.'
)
)
@sys_admin_ward
async def cmd_leo_gems(self, ctx: LionContext,
target: discord.User,
adjustment: Optional[int] = None,
note: Optional[appcmds.Range[str, 0, 1024]] = None,
reason: Optional[appcmds.Range[str, 0, 1024]] = None,):
if not ctx.interaction:
return
t = self.bot.translator.t
if adjustment is None or adjustment == 0:
# History viewing pathway
ui = TransactionList(self.bot, target.id, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
else:
# Adjustment path
# Show confirmation modal with note and reason
adjustment_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:amount|label',
"Number of LionGems to add. May be negative."
)),
default=str(adjustment),
required=True,
)
note_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:note|label',
"Optional note to attach to delivery message."
)),
default=note,
style=TextStyle.long,
max_length=1024,
required=False,
)
reason_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:reason|label',
"Optional reason to add to the audit log."
)),
default=reason,
style=TextStyle.long,
max_length=1024,
required=False,
)
modal = FastModal(
adjustment_field, note_field, reason_field,
title=t(_p(
'cmd:leo_gems|adjust|modal:confirm|title',
"Confirm LionGem Adjustment"
))
)
await ctx.interaction.response.send_modal(modal)
try:
interaction = await modal.wait_for(timeout=300)
except asyncio.TimeoutError:
raise SafeCancellation
await interaction.response.defer(thinking=False)
# Parse values
try:
amount = int(adjustment_field.value)
except ValueError:
raise UserInputError(
t(_p(
'cmd:leo_gems|adjust|error:parse_adjustment',
"Could not parse `{given}` as an integer."
)).format(given=adjustment_field.value)
)
note = note_field.value or None
reason = reason_field.value or None
# Run transaction
try:
transaction = await self.gem_transaction(
GemTransactionType.ADMIN,
actorid=ctx.author.id,
from_account=None, to_account=target.id,
amount=amount,
description=f"Admin balance adjustment with '/leo gems'.\n{reason}",
note=note
)
except GemTransactionFailed:
raise UserInputError(
t(_p(
'cmd:leo_gems|adjust|error:unknown',
"Balance adjustment failed! Check logs for more information."
))
)
# DM user with note if applicable
if amount > 0:
thumb = "https://cdn.discordapp.com/attachments/925799205954543636/938704034578194443/C85AF926-9F75-466F-9D8E-D47721427F5D.png"
icon = "https://cdn.discordapp.com/attachments/925799205954543636/938703943683416074/4CF1C849-D532-4DEC-B4C9-0AB11F443BAB.png"
desc = t(_p(
'cmd:leo_gems|adjust|target_msg|desc',
"You were given {gem}**{amount}**!\n"
"To use them, use the command {skin_cmd} to change your graphics skin!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
skin_cmd=self.bot.core.mention_cmd('my skin'),
)
embed = discord.Embed(
description=desc,
colour=discord.Colour.orange()
)
embed.set_thumbnail(url=thumb)
embed.set_author(
name=t(_p('cmd:leo_gems|adjust|target_msg|author:name', "LionGems Delivery!")),
icon_url=icon,
)
embed.set_footer(
text=t(_p(
'cmd:leo_gems|adjust|target_msg|footer:text',
"You now have {balance} LionGems"
)).format(
balance=await self.get_gem_balance(target.id),
)
)
embed.timestamp = utc_now()
note = note_field.value
if note:
embed.add_field(
name=t(_p(
'cmd:lion_gems|adjust|target_msg|field:note|name',
"Note"
)),
value=note
)
try:
await target.send(embed=embed)
target_notified = True
except discord.HTTPException:
target_notified = False
else:
target_notified = None
# Ack the operation
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:lion_gems|adjust|embed:success|title',
"Success"
)),
description=t(_p(
'cmd:lion_gems|adjust|embed:success|description',
"Added {gem}**{amount}** to {target}'s account.\n"
"They now have {gem}**{balance}**"
)).format(
gem=self.bot.config.emojis.gem,
target=target.mention,
amount=amount,
balance=await self.get_gem_balance(target.id),
)
)
if target_notified is False:
embed.add_field(
name="",
value=t(_p(
'cmd:lion_gems|adjust|embed:success|field:notify_failed|value',
"Could not notify the target, they probably have direct messages disabled."
))
)
await ctx.reply(embed=embed, ephemeral=True)

View File

@@ -0,0 +1,92 @@
from enum import Enum
from psycopg import sql
from meta.logger import log_wrap
from data import Registry, RowModel, RegisterEnum, Table
from data.columns import Integer, Bool, Column, Timestamp, String
class GemTransactionType(Enum):
"""
Schema
------
CREATE TYPE GemTransactionType AS ENUM (
'ADMIN',
'GIFT',
'PURCHASE',
'AUTOMATIC'
);
"""
ADMIN = 'ADMIN',
GIFT = 'GIFT',
PURCHASE = 'PURCHASE',
AUTOMATIC = 'AUTOMATIC',
class PremiumData(Registry):
GemTransactionType = RegisterEnum(GemTransactionType, 'GemTransactionType')
class GemTransaction(RowModel):
"""
Schema
------
CREATE TABLE gem_transactions(
transactionid SERIAL PRIMARY KEY,
transaction_type GemTransactionType NOT NULL,
actorid BIGINT NOT NULL,
from_account BIGINT,
to_account BIGINT,
amount INTEGER NOT NULL,
description TEXT NOT NULL,
note TEXT,
reference TEXT,
_timestamp TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX gem_transactions_from ON gem_transactions (from_account);
"""
_tablename_ = 'gem_transactions'
transactionid = Integer(primary=True)
transaction_type: Column[GemTransactionType] = Column()
actorid = Integer()
from_account = Integer()
to_account = Integer()
amount = Integer()
description = String()
note = String()
reference = String()
_timestamp = Timestamp()
class PremiumGuild(RowModel):
"""
Schema
------
CREATE TABLE premium_guilds(
guildid BIGINT PRIMARY KEY REFERENCES guild_config,
premium_since TIMESTAMPTZ NOT NULL DEFAULT now(),
premium_until TIMESTAMPTZ NOT NULL DEFAULT now(),
custom_skin_id INTEGER REFERENCES customised_skins
);
"""
_tablename_ = "premium_guilds"
_cache_ = {}
guildid = Integer(primary=True)
premium_since = Timestamp()
premium_until = Timestamp()
custom_skin_id = Integer()
"""
CREATE TABLE premium_guild_contributions(
contributionid SERIAL PRIMARY KEY,
userid BIGINT NOT NULL REFERENCES user_config,
guildid BIGINT NOT NULL REFERENCES premium_guilds,
transactionid INTEGER REFERENCES gem_transactions,
duration INTEGER NOT NULL,
_timestamp TIMESTAMPTZ DEFAULT now()
);
"""
premium_guild_contributions = Table('premium_guild_contributions')

View File

@@ -0,0 +1,19 @@
class GemTransactionFailed(Exception):
"""
Base exception class used when a gem transaction failed.
"""
pass
class BalanceTooLow(GemTransactionFailed):
"""
Exception raised when transaction results in a negative gem balance.
"""
pass
class BalanceTooHigh(GemTransactionFailed):
"""
Exception raised when transaction results in gem balance overflow.
"""
pass

View File

@@ -0,0 +1,286 @@
from typing import Optional, TYPE_CHECKING, NamedTuple
import asyncio
import datetime as dt
import discord
from discord.ui.button import button, Button, ButtonStyle
from psycopg import sql
from meta import LionBot, conf
from meta.logger import log_wrap
from core.lion_user import LionUser
from babel.translator import LazyStr
from meta.errors import ResponseTimedOut, UserInputError
from data import RawExpr
from modules.premium.errors import BalanceTooLow
from utils.ui import MessageUI, Confirm, AButton
from utils.lib import MessageArgs, utc_now
from .. import babel, logger
from ..data import GemTransactionType, PremiumData
if TYPE_CHECKING:
from ..cog import PremiumCog
_p = babel._p
class PremiumPlan(NamedTuple):
text: LazyStr
label: LazyStr
emoji: Optional[discord.PartialEmoji | discord.Emoji | str]
duration: int
price: int
plans = [
PremiumPlan(
_p('plan:three_months|text', "three months"),
_p('plan:three_months|label', "Three Months"),
None,
90,
4000
),
PremiumPlan(
_p('plan:one_year|text', "one year"),
_p('plan:one_year|label', "One Year"),
None,
365,
12000
),
PremiumPlan(
_p('plan:one_month|text', "one month"),
_p('plan:one_month|label', "One Month"),
None,
30,
1500
),
]
class PremiumUI(MessageUI):
def __init__(self, bot: LionBot, guild: discord.Guild, luser: LionUser, **kwargs):
super().__init__(**kwargs)
self.bot = bot
self.guild = guild
self.luser = luser
self.cog: 'PremiumCog' = bot.get_cog('PremiumCog') # type: ignore
# UI State
self.premium_status: Optional[PremiumData.PremiumGuild] = None
self.plan_buttons = self._plan_buttons()
self.link_button = self.cog.buy_gems_button()
# ----- API -----
# ----- UI Components -----
async def plan_button(self, press: discord.Interaction, pressed: Button, plan: PremiumPlan):
t = self.bot.translator.t
# Check Balance
if self.luser.data.gems < plan.price:
raise UserInputError(
t(_p(
'ui:premium|button:plan|error:insufficient_gems',
"You do not have enough LionGems to purchase this plan!"
))
)
# Confirm Purchase
confirm_msg = t(_p(
'ui:premium|button:plan|confirm|desc',
"Contributing **{plan_text}** of premium subscription for this server"
" will cost you {gem}**{plan_price}**.\n"
"Are you sure you want to proceed?"
)).format(
plan_text=t(plan.text),
gem=self.bot.config.emojis.gem,
plan_price=plan.price,
)
confirm = Confirm(confirm_msg, press.user.id)
confirm.embed.title = t(_p(
'ui:premium|button:plan|confirm|title',
"Confirm Server Upgrade"
))
confirm.embed.set_footer(
text=t(_p(
'ui:premium|button:plan|confirm|footer',
"Your current balance is {balance} LionGems"
)).format(balance=self.luser.data.gems)
)
confirm.embed.colour = 0x41f097
try:
result = await confirm.ask(press, ephemeral=True)
except ResponseTimedOut:
result = False
if not result:
await press.followup.send(
t(_p(
'ui:premium|button:plan|confirm|cancelled',
"Purchase cancelled! No LionGems were deducted from your account."
)),
ephemeral=True
)
return
# Write transaction, plan contribution, and new plan status, with potential rollback
try:
await self._do_premium_upgrade(plan)
except BalanceTooLow:
raise UserInputError(
t(_p(
'ui:premium|button:plan|error:insufficient_gems_post_confirm',
"Insufficient LionGems to purchase this plan!"
))
)
# Acknowledge premium
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'ui:premium|button:plan|embed:success|title',
"Server Upgraded!"
)),
description=t(_p(
'ui:premium|button:plan|embed:success|desc',
"You have contributed **{plan_text}** of premium subscription to this server!"
)).format(plan_text=plan.text)
)
await press.followup.send(
embed=embed
)
await self.refresh()
@log_wrap(action='premium upgrade')
async def _do_premium_upgrade(self, plan: PremiumPlan):
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
# Perform the gem transaction
transaction = await self.cog.gem_transaction(
GemTransactionType.PURCHASE,
actorid=self.luser.userid,
from_account=self.luser.userid,
to_account=None,
amount=plan.price,
description=(
f"User purchased {plan.duration} days of premium"
f" for guild {self.guild.id} using the `PremiumUI`."
),
note=None,
reference=f"iid: {self._original.id if self._original else 'None'}"
)
model = self.cog.data.PremiumGuild
# Ensure the premium guild row exists
premium_guild = await model.fetch_or_create(self.guild.id)
# Extend the subscription
await model.table.update_where(guildid=self.guild.id).set(
premium_until=RawExpr(
sql.SQL("GREATEST(premium_until, now()) + {}").format(
sql.Placeholder()
),
(dt.timedelta(days=plan.duration),)
)
)
# Finally, record the user's contribution
await self.cog.data.premium_guild_contributions.insert(
userid=self.luser.userid, guildid=self.guild.id,
transactionid=transaction.transactionid, duration=plan.duration
)
def _plan_buttons(self) -> list[Button]:
"""
Generate the Plan buttons.
Intended to be used once, upon initialisation.
"""
t = self.bot.translator.t
buttons = []
for plan in plans:
butt = AButton(
label=t(plan.label),
emoji=plan.emoji,
style=ButtonStyle.blurple,
pass_kwargs={'plan': plan}
)
butt(self.plan_button)
self.add_item(butt)
buttons.append(butt)
return buttons
# ----- UI Flow -----
def _current_status(self) -> str:
t = self.bot.translator.t
if self.premium_status is None or self.premium_status.premium_until is None:
status = t(_p(
'ui:premium|current_status:none',
"__**Current Server Status:**__ Awaiting Upgrade."
))
elif self.premium_status.premium_until > utc_now():
status = t(_p(
'ui:premium|current_status:premium',
"__**Current Server Status:**__ Upgraded! Premium until {expiry}"
)).format(expiry=discord.utils.format_dt(self.premium_status.premium_until, 'd'))
else:
status = t(_p(
'ui:premium|current_status:none',
"__**Current Server Status:**__ Awaiting Upgrade. Premium status expired on {expiry}"
)).format(expiry=discord.utils.format_dt(self.premium_status.premium_until, 'd'))
return status
async def make_message(self) -> MessageArgs:
t = self.bot.translator.t
blurb = t(_p(
'ui:premium|embed|description',
"By supporting our project, you will get access to countless customisation features!\n\n"
"- **Rebranding:** Customizable HEX colours and"
" **beautiful premium skins** for all of your community members!\n"
"- **Remove the vote and sponsor prompt!**\n"
"- Access to all of the [future premium features](https://staging.lionbot.org/donate)\n\n"
"Both server owners and **regular users** can"
" **buy and gift a subscription for this server** using this command!\n"
"To support both Leo and your server, **use the buttons below**!"
)) + '\n\n' + self._current_status()
embed = discord.Embed(
colour=0x41f097,
title=t(_p(
'ui:premium|embed|title',
"Support Leo and Upgrade your Server!"
)),
description=blurb,
)
embed.set_thumbnail(
url="https://i.imgur.com/v1mZolL.png"
)
embed.set_image(
url="https://cdn.discordapp.com/attachments/824196406482305034/972405513570615326/premium_test.png"
)
embed.set_footer(
text=t(_p(
'ui:premium|embed|footer',
"Your current balance is {balance} LionGems."
)).format(balance=self.luser.data.gems)
)
return MessageArgs(embed=embed)
async def refresh_layout(self):
self.set_layout(
(*self.plan_buttons, self.link_button),
)
async def reload(self):
self.premium_status = await self.cog.data.PremiumGuild.fetch(self.guild.id, cached=False)
await self.luser.data.refresh()

View File

@@ -0,0 +1,198 @@
from typing import Optional
import asyncio
import datetime as dt
import discord
from discord.ui.button import button, Button, ButtonStyle
from meta import LionBot, conf
from data import ORDER
from utils.ui import MessageUI, input
from utils.lib import MessageArgs, tabulate
from .. import babel, logger
from ..data import PremiumData
_p = babel._p
class TransactionList(MessageUI):
block_len = 5
def __init__(self, bot: LionBot, userid: int, **kwargs):
super().__init__(**kwargs)
self.bot = bot
self.userid = userid
self._pagen = 0
self.blocks: list[list[PremiumData.GemTransaction]] = [[]]
@property
def page_count(self):
return len(self.blocks)
@property
def pagen(self):
self._pagen = self._pagen % self.page_count
return self._pagen
@pagen.setter
def pagen(self, value):
self._pagen = value % self.page_count
@property
def current_page(self):
return self.blocks[self.pagen]
# ----- UI Components -----
# Backwards
@button(emoji=conf.emojis.backward, style=ButtonStyle.grey)
async def prev_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True, ephemeral=True)
self.pagen -= 1
await self.refresh(thinking=press)
# Jump to page
@button(label="JUMP_PLACEHOLDER", style=ButtonStyle.blurple)
async def jump_button(self, press: discord.Interaction, pressed: Button):
"""
Jump-to-page button.
Loads a page-switch dialogue.
"""
t = self.bot.translator.t
try:
interaction, value = await input(
press,
title=t(_p(
'ui:transactions|button:jump|input:title',
"Jump to page"
)),
question=t(_p(
'ui:transactions|button:jump|input:question',
"Page number to jump to"
))
)
value = value.strip()
except asyncio.TimeoutError:
return
if not value.lstrip('- ').isdigit():
error_embed = discord.Embed(
title=t(_p(
'ui:transactions|button:jump|error:invalid_page',
"Invalid page number, please try again!"
)),
colour=discord.Colour.brand_red()
)
await interaction.response.send_message(embed=error_embed, ephemeral=True)
else:
await interaction.response.defer(thinking=True)
pagen = int(value.lstrip('- '))
if value.startswith('-'):
pagen = -1 * pagen
elif pagen > 0:
pagen = pagen - 1
self.pagen = pagen
await self.refresh(thinking=interaction)
async def jump_button_refresh(self):
component = self.jump_button
component.label = f"{self.pagen + 1}/{self.page_count}"
component.disabled = (self.page_count <= 1)
# Forward
@button(emoji=conf.emojis.forward, style=ButtonStyle.grey)
async def next_button(self, press: discord.Interaction, pressed: Button):
await press.response.defer(thinking=True)
self.pagen += 1
await self.refresh(thinking=press)
# Quit
@button(emoji=conf.emojis.cancel, style=ButtonStyle.red)
async def quit_button(self, press: discord.Interaction, pressed: Button):
"""
Quit the UI.
"""
await press.response.defer()
await self.quit()
# ----- UI Flow -----
async def make_message(self) -> MessageArgs:
t = self.bot.translator.t
title = t(_p(
'ui:transactions|embed|title',
"Gem Transactions for user `{userid}`"
)).format(userid=self.userid)
rows = self.current_page
if rows:
embed = discord.Embed(
colour=discord.Colour.orange(),
title=title,
description=t(_p(
'ui:transactions|embed|desc:balance',
"User {target} has a LionGem balance of {gem}**{balance}**"
)).format(
gem=self.bot.config.emojis.gem,
target=f"<@{self.userid}>",
balance=await (self.bot.get_cog('PremiumCog')).get_gem_balance(self.userid),
)
)
for row in rows:
name = f"Transaction #{row.transactionid}"
table_rows = (
('timestamp', discord.utils.format_dt(row._timestamp)),
('type', row.transaction_type.name),
('amount', str(row.amount)),
('actor', f"<@{row.actorid}>"),
('from', f"`{row.from_account}`" if row.from_account else 'None'),
('to', f"`{row.to_account}`" if row.to_account else 'None'),
('reference', str(row.reference)),
)
table = '\n'.join(tabulate(*table_rows))
embed.add_field(
name=name,
value=f"{row.description}\n{table}",
inline=False
)
else:
embed = discord.Embed(
colour=discord.Colour.brand_red(),
description = t(_p(
'ui:transactions|embed|desc:no_transactions',
"This user has no related gem transactions!"
))
)
return MessageArgs(embed=embed)
async def refresh_layout(self):
to_refresh = (
self.jump_button_refresh(),
)
await asyncio.gather(*to_refresh)
if self.page_count > 1:
self.set_layout(
(self.prev_button, self.jump_button, self.quit_button, self.next_button),
)
else:
self.set_layout(
(self.quit_button,)
)
async def reload(self):
model = PremiumData.GemTransaction
rows = await model.fetch_where(
(model.from_account == self.userid) | (model.to_account == self.userid)
).order_by('_timestamp', ORDER.DESC)
blocks = [
rows[i:i+self.block_len]
for i in range(0, len(rows), self.block_len)
]
self.blocks = blocks or [[]]

View File

@@ -36,9 +36,13 @@ class SponsorCog(LionCog):
"""
if not interaction.is_expired():
# TODO: caching
whitelist = (await self.settings.Whitelist.get(self.bot.appname)).value
if interaction.guild and interaction.guild.id in whitelist:
return
if interaction.guild:
whitelist = (await self.settings.Whitelist.get(self.bot.appname)).value
if interaction.guild.id in whitelist:
return
premiumcog = self.bot.get_cog('PremiumCog')
if premiumcog and await premiumcog.is_premium_guild(interaction.guild.id):
return
setting = await self.settings.SponsorPrompt.get(self.bot.appname)
value = setting.value
if value:

View File

@@ -537,10 +537,11 @@ class LeaderboardUI(StatsUI):
period_row,
page_row
]
voting = self.bot.get_cog('TopggCog')
if voting and not await voting.check_voted_recently(self.userid):
self._layout.append((voting.vote_button(),))
premiumcog = self.bot.get_cog('PremiumCog')
if not (premiumcog and await premiumcog.is_premium_guild(self.guild.id)):
self._layout.append((voting.vote_button(),))
async def reload(self):
"""

View File

@@ -299,7 +299,9 @@ class ProfileUI(StatsUI):
voting = self.bot.get_cog('TopggCog')
if voting and not await voting.check_voted_recently(self.userid):
self._layout.append((voting.vote_button(),))
premiumcog = self.bot.get_cog('PremiumCog')
if not (premiumcog and await premiumcog.is_premium_guild(self.guild.id)):
self._layout.append((voting.vote_button(),))
async def _render_stats(self):
"""

View File

@@ -753,7 +753,9 @@ class WeeklyMonthlyUI(StatsUI):
voting = self.bot.get_cog('TopggCog')
if voting and not await voting.check_voted_recently(self.userid):
self._layout.append((voting.vote_button(),))
premiumcog = self.bot.get_cog('PremiumCog')
if not (premiumcog and await premiumcog.is_premium_guild(self.guild.id)):
self._layout.append((voting.vote_button(),))
if self._showing_selector:
await self.period_menu_refresh()

View File

@@ -431,7 +431,7 @@ class Exec(LionCog):
results = [
appcmd.Choice(name=f"No extensions found matching {partial}", value="None")
]
return results
return results[:25]
@commands.hybrid_command(
name=_('shutdown'),