feat(premium): Implement gem API and admin.

This commit is contained in:
2023-10-26 09:50:07 +03:00
parent 90c86460b6
commit c41374bbaa
8 changed files with 1026 additions and 1 deletions

705
src/modules/premium/cog.py Normal file
View File

@@ -0,0 +1,705 @@
from typing import Optional
import asyncio
import discord
from discord.ext import commands as cmds
import discord.app_commands as appcmds
from discord.ui.button import Button, ButtonStyle
from discord.ui.text_input import TextInput, TextStyle
from meta import LionCog, LionBot, LionContext
from meta.errors import SafeCancellation, UserInputError
from meta.logger import log_wrap
from utils.lib import utc_now
from utils.ui import FastModal
from wards import sys_admin_ward
from constants import MAX_COINS
from . import logger, babel
from .data import PremiumData, GemTransactionType
from .ui.transactions import TransactionList
from .errors import GemTransactionFailed, BalanceTooLow, BalanceTooHigh
_p = babel._p
class PremiumCog(LionCog):
buy_gems_link = "https://lionbot.org/donate"
def __init__(self, bot: LionBot):
self.bot = bot
self.data: PremiumData = bot.db.load_registry(PremiumData())
self.gem_logger: Optional[discord.Webhook] = None
async def cog_load(self):
await self.data.init()
if (leo_setting_cog := self.bot.get_cog('LeoSettings')) is not None:
self.crossload_group(self.leo_group, leo_setting_cog.leo_group)
if (gem_log_url := self.bot.config.endpoints.get('gem_log', None)) is not None:
self.gem_logger = discord.Webhook.from_url(gem_log_url, session=self.bot.web_client)
# ----- API -----
def buy_gems_buttons(self) -> Button:
t = self.bot.translator.t
button = Button(
style=ButtonStyle.link,
label=t(_p(
'button:gems|label',
"Buy Gems"
)),
emoji=self.bot.config.emojis.gem,
url=self.buy_gems_link,
)
return button
async def get_gem_balance(self, userid: int) -> int:
"""
Get the up-to-date gem balance for this user.
Creates the User row if it does not already exist.
"""
record = await self.bot.core.data.User.fetch(userid, cached=False)
if record is None:
record = await self.bot.core.data.User.create(userid=userid)
return record.gems
async def get_gift_count(self, userid: int) -> int:
"""
Compute the number of gifts this user has sent, by counting Transaction rows.
"""
record = await self.data.GemTransaction.table.select_where(
from_account=userid,
transaction_type=GemTransactionType.GIFT,
).select(
gift_count='COUNT(*)'
).with_no_adapter()
return record[0]['gift_count'] or 0
async def is_premium_guild(self, guildid: int) -> bool:
"""
Check whether the given guild currently has premium status.
"""
row = await self.data.PremiumGuild.fetch(guildid)
now = utc_now()
premium = (row is not None) and (row.premium_until > now)
return premium
@log_wrap(isolate=True)
async def _add_gems(self, userid: int, amount: int):
"""
Transaction helper method to atomically add `amount` gems to account `userid`,
creating the account if required.
Do not use this method for a gem transaction. Use `gem_transaction` instead.
"""
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
model = self.bot.core.data.User
rows = await model.table.update_where(userid=userid).set(gems=model.gems + amount)
if not rows:
# User does not exist, create it
if amount < 0:
raise BalanceTooLow
if amount > MAX_COINS:
raise BalanceTooHigh
row = (await model.create(userid=userid, gems=amount)).data
else:
row = rows[0]
if row['gems'] < 0:
raise BalanceTooLow
async def gem_transaction(
self,
transaction_type: GemTransactionType,
*,
actorid: int,
from_account: Optional[int], to_account: Optional[int],
amount: int, description: str,
note: Optional[str] = None, reference: Optional[str] = None,
) -> PremiumData.GemTransaction:
"""
Perform a gem transaction with the given parameters.
This atomically creates a row in the 'gem_transactions' table,
updates the account balances,
and posts in the gem audit log.
Parameters
----------
transaction_type: GemTransactionType
The type of transaction.
actorid: int
The userid of the actor who initiated this transaction.
Automatic actions (e.g. webhook triggered) may have their own unique id.
from_account: Optional[int]
The userid of the source account.
May be `None` if there is no source account (e.g. manual modification by admin).
to_account: Optional[int]
The userid of the destination account.
May be `None` if there is no destination account.
amount: int
The number of LionGems to transfer.
description: str
An informative description of the transaction for auditing purposes.
Should include the pathway (e.g. command) through which the transaction was executed.
note: Optional[str]
Optional user-readable note added by the actor.
Usually attached in a notification visible by the target.
(E.g. thanks message from system/admin, or note attached to gift.)
reference: str
Optional admin-readable transaction reference.
This may be the message link of a command message,
or an external id/reference for an automatic transaction.
Raises
------
BalanceTooLow:
Raised if either source or target account would go below 0.
"""
async with self.bot.db.connection() as conn:
self.bot.db.conn = conn
async with conn.transaction():
if from_account is not None:
await self._add_gems(from_account, -amount)
if to_account is not None:
await self._add_gems(to_account, amount)
row = await self.data.GemTransaction.create(
transaction_type=transaction_type,
actorid=actorid,
from_account=from_account,
to_account=to_account,
amount=amount,
description=description,
note=note,
reference=reference,
)
logger.info(
f"LionGem Transaction performed. Transaction data: {row!r}"
)
await self.audit_log(row)
return row
async def audit_log(self, row: PremiumData.GemTransaction):
"""
Log the provided gem transaction to the global gem audit log.
If this fails, or the audit log does not exist, logs a warning.
"""
posted = False
if self.gem_logger is not None:
embed = discord.Embed(
colour=discord.Colour.orange(),
title=f"Gem Transaction #{row.transactionid}",
timestamp=row._timestamp,
)
embed.add_field(name="Type", value=row.transaction_type.name)
embed.add_field(name="Amount", value=str(row.amount))
embed.add_field(name="Actor", value=f"<@{row.actorid}>")
embed.add_field(name="From Account", value=f"<@{row.from_account}>" if row.from_account else 'None')
embed.add_field(name="To Account", value=f"<@{row.to_account}>" if row.to_account else 'None')
embed.add_field(name='Description', value=str(row.description), inline=False)
if row.note:
embed.add_field(name='Note', value=str(row.note), inline=False)
if row.reference:
embed.add_field(name='Reference', value=str(row.reference), inline=False)
try:
await self.gem_logger.send(embed=embed)
except discord.HTTPException:
pass
if not posted:
logger.warning(
f"Missed gem audit logging for gem transaction: {row!r}"
)
# ----- User Commands -----
@cmds.hybrid_command(
name=_p('cmd:free', "free"),
description=_p(
'cmd:free|desc',
"Get free LionGems!"
)
)
async def cmd_free(self, ctx: LionContext):
t = self.bot.translator.t
content = t(_p(
'cmd:free|embed|description',
"You can get free LionGems by sharing our project on your Discord server and social media!\n"
"If you have well-established, or YouTube, Instagram, and TikTok accounts,"
" we will reward you for creating videos and content about the bot.\n"
"If you have a big server, you can promote our project and get LionGems in return.\n"
"For more details, contact `arihoresh` or open a Ticket in the [support server](https://discord.gg/studylions)."
))
thumb = "https://cdn.discordapp.com/attachments/890619584158265405/972791204498530364/Untitled_design_44.png"
title = t(_p(
'cmd:free|embed|title',
"Get FREE LionGems!"
))
embed = discord.Embed(
title=title,
description=content,
colour=0x41f097
)
embed.set_thumbnail(url=thumb)
await ctx.reply(embed=embed)
@cmds.hybrid_command(
name=_p('cmd:gift', "gift"),
description=_p(
'cmd:gift|desc',
"Gift your LionGems to another user!"
)
)
@appcmds.rename(
user=_p('cmd:gift|param:user', "user"),
amount=_p('cmd:gift|param:amount', "amount"),
note=_p('cmd:gift|param:note', "note"),
)
@appcmds.describe(
user=_p(
'cmd:gift|param:user|desc',
"User to which you want to gift your LionGems."
),
amount=_p(
'cmd:gift|param:amount|desc',
"Number of LionGems to gift."
),
note=_p(
'cmd:gift|param:note|desc',
"Optional note to attach to your gift."
),
)
async def cmd_gift(self, ctx: LionContext,
user: discord.User,
amount: appcmds.Range[int, 1, MAX_COINS],
note: Optional[appcmds.Range[str, 1, 1024]] = None):
if not ctx.interaction:
return
t = self.bot.translator.t
# Validate target
if user.bot:
raise UserInputError(
t(_p(
'cmd:gift|error:target_bot',
"You cannot gift LionGems to bots!"
))
)
if user.id == ctx.author.id:
raise UserInputError(
t(_p(
'cmd:gift|error:target_is_author',
"You cannot gift LionGems to yourself!"
))
)
# Prepare and open gift confirmation modal
amount_field = TextInput(
label=t(_p(
'cmd:gift|modal:confirm|field:amount|label',
"Number of LionGems to Gift"
)),
default=str(amount),
required=True,
)
note_field = TextInput(
label=t(_p(
'cmd:gift|modal:confirm|field:note|label',
"Add an optional note to your gift"
)),
default=note or '',
required=False,
max_length=1024,
style=TextStyle.long,
)
modal = FastModal(
amount_field, note_field,
title=t(_p(
'cmd:gift|modal:confirm|title',
"Confirm LionGem Gift"
))
)
await ctx.interaction.response.send_modal(modal)
try:
interaction = await modal.wait_for(timeout=300)
except asyncio.TimeoutError:
# Presume user cancelled and wants to abort
raise SafeCancellation
await interaction.response.defer(thinking=False)
# Parse amount
amountstr = amount_field.value
if not amountstr.isdigit():
raise UserInputError(
t(_p(
'cmd:gift|error:parse_amount',
"Could not parse `{provided}` as a number!"
)).format(provided=amountstr)
)
amount = int(amountstr)
if amount == 0:
raise UserInputError(
t(_p(
'cmd:gift|error:amount_zero',
"Cannot gift `0` gems."
))
)
# Get author's balance, make sure they have enough
author_balance = await self.get_gem_balance(ctx.author.id)
if author_balance < amount:
raise UserInputError(
t(_p(
'cmd:gift|error:author_balance_too_low',
"Insufficient balance to send {gem}**{amount}**!\n"
"Current balance: {gem}**{balance}**"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
balance=author_balance,
)
)
# Everything seems to be in order, run the transaction
try:
transaction = await self.gem_transaction(
GemTransactionType.GIFT,
actorid=ctx.author.id,
from_account=ctx.author.id, to_account=user.id,
amount=amount,
description="Gift given through command '/gift'",
note=note_field.value or None
)
except BalanceTooLow:
raise UserInputError(
t(_p(
'cmd:gift|error:balance_too_low',
"Insufficient Balance to complete gift!"
))
)
# Attempt to send note to user
thumb = "https://cdn.discordapp.com/attachments/925799205954543636/938704034578194443/C85AF926-9F75-466F-9D8E-D47721427F5D.png"
icon = "https://cdn.discordapp.com/attachments/925799205954543636/938703943683416074/4CF1C849-D532-4DEC-B4C9-0AB11F443BAB.png"
desc = t(_p(
'cmd:gift|target_msg|desc',
"You were just gifted {gem}**{amount}** by {user}!\n"
"To use them, use the command {skin_cmd} to change your graphics skin!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
user=ctx.author.mention,
skin_cmd=self.bot.core.mention_cmd('my skin'),
)
embed = discord.Embed(
description=desc,
colour=discord.Colour.orange()
)
embed.set_thumbnail(url=thumb)
embed.set_author(
name=t(_p('cmd:gift|target_msg|author:name', "LionGems Delivery!")),
icon_url=icon,
)
embed.set_footer(
text=t(_p(
'cmd:gift|target_msg|footer:text',
"You now have {balance} LionGems"
)).format(
balance=await self.get_gem_balance(user.id),
)
)
embed.timestamp = utc_now()
note = note_field.value
if note:
embed.add_field(
name=t(_p(
'cmd:gift|target_msg|field:note|name',
"The sender attached a note"
)),
value=note
)
notify_sent = False
try:
await user.send(embed=embed)
notify_sent = True
except discord.HTTPException:
logger.info(
f"Could not send LionGem gift target their gift notification. Transaction {transaction.transactionid}"
)
# Finally, send the ack back to the author
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:gift|embed:success|title',
"Gift Sent!"
)),
description=t(_p(
'cmd:gift|embed:success|description',
"Your gift of {gem}**{amount}** is on its way to {target}!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
target=user.mention,
)
)
embed.set_footer(
text=t(_p(
'cmd:gift|embed:success|footer',
"New Balance: {balance} LionGems",
)).format(balance=await self.get_gem_balance(ctx.author.id))
)
if not notify_sent:
embed.add_field(
name="",
value=t(_p(
'cmd:gift|embed:success|field:notify_failed|value',
"Unfortunately, I couldn't tell them about it! "
"They might have direct messages with me turned off."
))
)
await ctx.reply(embed=embed, ephemeral=True)
@cmds.hybrid_command(
name=_p('cmd:premium', "premium"),
description=_p(
'cmd:premium|desc',
"Upgrade your server with LionGems!"
)
)
async def cmd_premium(self, ctx: LionContext):
# TODO
...
# ----- Owner Commands -----
@LionCog.placeholder_group
@cmds.hybrid_group("leo", with_app_command=False)
async def leo_group(self, ctx: LionContext):
...
@leo_group.command(
name=_p('cmd:leo_gems', "gems"),
description=_p(
'cmd:leo_gems|desc',
"View and adjust a user's LionGem balance."
)
)
@appcmds.rename(
target=_p('cmd:leo_gems|param:target', "target"),
adjustment=_p('cmd:leo_gems|param:adjustment', "adjustment"),
note=_p('cmd:leo_gems|param:note', "note"),
reason=_p('cmd:leo_gems|param:reason', "reason")
)
@appcmds.describe(
target=_p(
'cmd:leo_gems|param:target|desc',
"Target user you wish to view or modify LionGems for."
),
adjustment=_p(
'cmd:leo_gems|param:adjustment|desc',
"Number of LionGems to add to the target's balance (may be negative to remove)"
),
note=_p(
'cmd:leo_gems|param:note|desc',
"Optional note to attach to the delivery message when adding LionGems."
),
reason=_p(
'cmd:leo_gems|param:reason|desc',
'Optional reason or context to add to the gem audit log for this transaction.'
)
)
@sys_admin_ward
async def cmd_leo_gems(self, ctx: LionContext,
target: discord.User,
adjustment: Optional[int] = None,
note: Optional[appcmds.Range[str, 0, 1024]] = None,
reason: Optional[appcmds.Range[str, 0, 1024]] = None,):
if not ctx.interaction:
return
t = self.bot.translator.t
if adjustment is None or adjustment == 0:
# History viewing pathway
ui = TransactionList(self.bot, target.id, callerid=ctx.author.id)
await ui.run(ctx.interaction)
await ui.wait()
else:
# Adjustment path
# Show confirmation modal with note and reason
adjustment_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:amount|label',
"Number of LionGems to add. May be negative."
)),
default=str(adjustment),
required=True,
)
note_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:note|label',
"Optional note to attach to delivery message."
)),
default=note,
style=TextStyle.long,
max_length=1024,
required=False,
)
reason_field = TextInput(
label=t(_p(
'cmd:leo_gems|adjust|modal:confirm|field:reason|label',
"Optional reason to add to the audit log."
)),
default=reason,
style=TextStyle.long,
max_length=1024,
required=False,
)
modal = FastModal(
adjustment_field, note_field, reason_field,
title=t(_p(
'cmd:leo_gems|adjust|modal:confirm|title',
"Confirm LionGem Adjustment"
))
)
await ctx.interaction.response.send_modal(modal)
try:
interaction = await modal.wait_for(timeout=300)
except asyncio.TimeoutError:
raise SafeCancellation
await interaction.response.defer(thinking=False)
# Parse values
try:
amount = int(adjustment_field.value)
except ValueError:
raise UserInputError(
t(_p(
'cmd:leo_gems|adjust|error:parse_adjustment',
"Could not parse `{given}` as an integer."
)).format(given=adjustment_field.value)
)
note = note_field.value or None
reason = reason_field.value or None
# Run transaction
try:
transaction = await self.gem_transaction(
GemTransactionType.ADMIN,
actorid=ctx.author.id,
from_account=None, to_account=target.id,
amount=amount,
description=f"Admin balance adjustment with '/leo gems'.\n{reason}",
note=note
)
except GemTransactionFailed:
raise UserInputError(
t(_p(
'cmd:leo_gems|adjust|error:unknown',
"Balance adjustment failed! Check logs for more information."
))
)
# DM user with note if applicable
if amount > 0:
thumb = "https://cdn.discordapp.com/attachments/925799205954543636/938704034578194443/C85AF926-9F75-466F-9D8E-D47721427F5D.png"
icon = "https://cdn.discordapp.com/attachments/925799205954543636/938703943683416074/4CF1C849-D532-4DEC-B4C9-0AB11F443BAB.png"
desc = t(_p(
'cmd:leo_gems|adjust|target_msg|desc',
"You were given {gem}**{amount}**!\n"
"To use them, use the command {skin_cmd} to change your graphics skin!"
)).format(
gem=self.bot.config.emojis.gem,
amount=amount,
skin_cmd=self.bot.core.mention_cmd('my skin'),
)
embed = discord.Embed(
description=desc,
colour=discord.Colour.orange()
)
embed.set_thumbnail(url=thumb)
embed.set_author(
name=t(_p('cmd:leo_gems|adjust|target_msg|author:name', "LionGems Delivery!")),
icon_url=icon,
)
embed.set_footer(
text=t(_p(
'cmd:leo_gems|adjust|target_msg|footer:text',
"You now have {balance} LionGems"
)).format(
balance=await self.get_gem_balance(target.id),
)
)
embed.timestamp = utc_now()
note = note_field.value
if note:
embed.add_field(
name=t(_p(
'cmd:lion_gems|adjust|target_msg|field:note|name',
"Note"
)),
value=note
)
try:
await target.send(embed=embed)
target_notified = True
except discord.HTTPException:
target_notified = False
else:
target_notified = None
# Ack the operation
embed = discord.Embed(
colour=discord.Colour.brand_green(),
title=t(_p(
'cmd:lion_gems|adjust|embed:success|title',
"Success"
)),
description=t(_p(
'cmd:lion_gems|adjust|embed:success|description',
"Added {gem}**{amount}** to {target}'s account.\n"
"They now have {gem}**{balance}**"
)).format(
gem=self.bot.config.emojis.gem,
target=target.mention,
amount=amount,
balance=await self.get_gem_balance(target.id),
)
)
if target_notified is False:
embed.add_field(
name="",
value=t(_p(
'cmd:lion_gems|adjust|embed:success|field:notify_failed|value',
"Could not notify the target, they probably have direct messages disabled."
))
)
await ctx.reply(embed=embed, ephemeral=True)