Files
adventures/src/routes/users.py

462 lines
17 KiB
Python

"""
- `/users` with `POST`, `GET`, `PATCH`, `DELETE`
- `/users/{user_id}` with `GET`, `PATCH`, `DELETE`
- `/users/{user_id}/events` which is passed to `/events`
- `/users/{user_id}/specimen` which is passed to `/specimens/{specimen_id}`
- `/users/{user_id}/specimens` which is passed to `/specimens`
- `/users/{user_id}/wallet` with `GET`
- `/users/{user_id}/transactions` which is passed to `/transactions`
"""
import logging
from datetime import datetime
from typing import Any, Literal, NamedTuple, Optional, Self, TypedDict, Unpack, overload, reveal_type, List
from aiohttp import web
import discord
from data import Condition, condition
from data.conditions import NULL
from data.queries import JOINTYPE
from datamodels import DataModel
from modules.profiles.data import ProfileData
from utils.lib import MessageArgs, tabulate
from .lib import ModelField, datamodelsv, dbvar, event_log, profiledatav
from .specimens import Specimen, SpecimenPayload
routes = web.RouteTableDef()
logger = logging.getLogger(__name__)
class UserPayload(TypedDict):
user_id: int
twitch_id: Optional[str]
name: Optional[str]
preferences: Optional[str]
created_at: str
class UserDetailsPayload(UserPayload):
specimen: Optional[SpecimenPayload]
inventory: List # TODO
wallet: int
class UserCreateParamsReq(TypedDict, total=True):
twitch_id: str
name: str
class UserCreateParams(UserCreateParamsReq, total=False):
preferences: str
class UserEditParams(TypedDict, total=False):
name: Optional[str]
preferences: Optional[str]
fields = [
ModelField('user_id', int, False, False, False),
ModelField('twitch_id', str, True, True, False),
ModelField('name', str, True, True, True),
ModelField('preferences', str, False, True, True),
ModelField('created_at', str, False, False, False),
]
req_fields = {field.name for field in fields if field.required}
edit_fields = {field.name for field in fields if field.can_edit}
create_fields = {field.name for field in fields if field.can_create}
class User:
def __init__(self, app: web.Application, row: DataModel.Dreamer):
self.app = app
self.data = app[datamodelsv]
self.profile_data = app[profiledatav]
self.row = row
self._pref_row: Optional[DataModel.UserPreferences] = None
async def get_prefs(self) -> DataModel.UserPreferences:
if self._pref_row is None:
self._pref_row = await self.data.UserPreferences.fetch_or_create(self.row.user_id)
return self._pref_row
@classmethod
async def validate_create_params(cls, params):
if extra := next((key for key in params if key not in create_fields), None):
raise web.HTTPBadRequest(text=f"Invalid key '{extra}' passed to user creation.")
if missing := next((key for key in req_fields if key not in params), None):
raise web.HTTPBadRequest(text=f"User params missing required key '{missing}'")
@classmethod
async def fetch_from_id(cls, app: web.Application, user_id: int):
data = app[datamodelsv]
row = await data.Dreamer.fetch(int(user_id))
return cls(app, row) if row is not None else None
@classmethod
async def query(
cls,
app: web.Application,
user_id: Optional[str] = None,
twitch_id: Optional[str] = None,
name: Optional[str] = None,
created_before: Optional[str] = None,
created_after: Optional[str] = None,
) -> List[Self]:
data = app[datamodelsv]
Dreamer = data.Dreamer
conds = []
if user_id is not None:
conds.append(Dreamer.user_id == int(user_id))
if twitch_id is not None:
conds.append(Dreamer.twitch_id == twitch_id)
if name is not None:
conds.append(Dreamer.name == name)
if created_before is not None:
cbefore = datetime.fromisoformat(created_before)
conds.append(Dreamer.created_at <= cbefore)
if created_after is not None:
cafter = datetime.fromisoformat(created_after)
conds.append(Dreamer.created_at >= cafter)
rows = await Dreamer.fetch_where(*conds).order_by(Dreamer.created_at)
return [cls(app, row) for row in rows]
@classmethod
async def create(cls, app: web.Application, **kwargs: Unpack[UserCreateParams]):
"""
Create a new User from the provided data.
This creates the associated UserProfile, TwitchProfile, and UserPreferences if needed.
If a profile already exists, this does *not* error.
Instead, this updates the existing User with the new data.
"""
data = app[datamodelsv]
twitch_id = kwargs['twitch_id']
name = kwargs['name']
prefs = kwargs.get('preferences')
# Quick sanity check on the twitch id
if not twitch_id or not twitch_id.isdigit():
raise web.HTTPBadRequest(text="Invalid 'twitch_id' passed to user creation!")
# First check if the profile already exists by querying the Dreamer database
edited = 0 # 0 means not edited, 1 means created, 2 means modified
rows = await data.Dreamer.fetch_where(twitch_id=twitch_id)
if rows:
logger.debug(f"Updating Dreamer for {twitch_id=} with {kwargs}")
dreamer = rows[0]
# A twitch profile with this twitch_id already exists
# But it is possible UserPreferences don't exist
if dreamer.preferences is None and dreamer.name is None:
await data.UserPreferences.fetch_or_create(dreamer.user_id, twitch_name=name, preferences=prefs)
dreamer = await dreamer.refresh()
edited = 2
# Now compare the existing data against the provided data and update if needed
if name != dreamer.name:
q = data.UserPreferences.table.update_where(profileid=dreamer.user_id)
q.set(twitch_name=name)
if prefs is not None:
q.set(preferences=prefs)
await q
dreamer = await dreamer.refresh()
edited = 2
else:
# Create from scratch
logger.info(f"Creating Dreamer for {twitch_id=} with {kwargs}")
# TODO: Should be in a transaction.. actually let's add transactions to the middleware..
profile_data = app[profiledatav]
user_profile = await profile_data.UserProfileRow.create(nickname=name)
await profile_data.TwitchProfileRow.create(
profileid=user_profile.profileid,
userid=twitch_id,
)
await data.UserPreferences.create(
profileid=user_profile.profileid,
twitch_name=name,
preferences=prefs
)
dreamer = await data.Dreamer.fetch(user_profile.profileid)
assert dreamer is not None
edited = 1
self = cls(app, dreamer)
if edited == 1:
args = await self.event_log_args(title=f"User #{dreamer.user_id} created!")
await event_log(**args.send_args)
elif edited == 2:
args = await self.event_log_args(title=f"User #{dreamer.user_id} updated!")
await event_log(**args.send_args)
return self
async def edit(self, **kwargs: Unpack[UserEditParams]):
data = self.data
# We can edit the name, and preferences
prefs = await self.get_prefs()
update_args = {}
if 'name' in kwargs:
update_args['twitch_name'] = kwargs['name']
if 'preferences' in kwargs:
update_args['preferences'] = kwargs['preferences']
if update_args:
logger.info(f"Updating dreamer {self.row=} with {kwargs}")
await prefs.update(**update_args)
args = await self.event_log_args(title=f"User #{self.row.user_id} updated!")
await event_log(**args.send_args)
async def delete(self) -> UserDetailsPayload:
payload = await self.prepare(details=True)
# This will cascade to all other data the user has
await self.profile_data.UserProfileRow.table.delete_where(profileid=self.row.user_id)
# Make sure we take the user out of cache
await self.row.refresh()
return payload
async def get_wallet(self):
query = self.data.Transaction.table.select_where(user_id=self.row.user_id)
query.select(wallet="SUM(amount)")
query.with_no_adapter()
results = await query
return results[0]['wallet']
async def get_specimen(self) -> Optional[Specimen]:
data = self.data
active_specrows = await data.Specimen.fetch_where(
owner_id=self.row.user_id,
forgotten_at=NULL
)
if active_specrows:
row = active_specrows[0]
spec = Specimen(self.app, row)
else:
spec = None
return spec
async def get_inventory(self):
return []
async def event_log_args(self, **kwargs) -> MessageArgs:
desc = '\n'.join(await self.tabulate())
embed = discord.Embed(description=desc, timestamp=self.row.created_at, **kwargs)
embed.set_footer(text='Created At')
# TODO: We could add wallet, specimen, and inventory info here too
return MessageArgs(embed=embed)
async def tabulate(self):
"""
Present the User as a discord-readable table.
"""
table = {
'user_id': f"`{self.row.user_id}`",
'twitch_id': f"`{self.row.twitch_id}`" if self.row.twitch_id else 'No Twitch linked',
'name': f"`{self.row.name}`",
'preferences': f"`{self.row.preferences}`",
'created_at': discord.utils.format_dt(self.row.created_at, 'F'),
}
return tabulate(*table.items())
@overload
async def prepare(self, details: Literal[True]=True) -> UserDetailsPayload:
...
@overload
async def prepare(self, details: Literal[False]=False) -> UserPayload:
...
async def prepare(self, details=False) -> UserPayload | UserDetailsPayload:
# Since we are working with view rows, make sure we refresh
row = self.row
await row.refresh()
base_user: UserPayload = {
'user_id': row.user_id,
'twitch_id': str(row.twitch_id) if row.twitch_id else None,
'name': row.name,
'preferences': row.preferences,
'created_at': row.created_at.isoformat(),
}
if details:
# Now add details
specimen = await self.get_specimen()
sp_payload = await specimen.prepare() if specimen is not None else None
inventory = [await item.prepare() for item in await self.get_inventory()]
user: UserPayload = base_user | {
'specimen': sp_payload,
'inventory': inventory,
'wallet': await self.get_wallet(),
}
else:
user = base_user
logger.debug(f"User prepared: {user}")
return user
@routes.view('/users')
@routes.view('/users/', name='users')
class UsersView(web.View):
async def post(self):
request = self.request
params = await request.json()
for key in create_fields:
if key in request:
params.setdefault(key, request[key])
await User.validate_create_params(params)
logger.info(f"Creating a new user with args: {params=}")
user = await User.create(self.request.app, **params)
logger.debug(f"Created user: {user!r}")
payload = await user.prepare(details=True)
return web.json_response(payload)
async def get(self):
request = self.request
filter_params = {}
keys = [
'user_id', 'twitch_id', 'name', 'created_before', 'created_after',
]
for key in keys:
value = request.query.get(key, request.get(key, None))
filter_params[key] = value
logger.info(f"Querying users with params: {filter_params=}")
users = await User.query(request.app, **filter_params)
payload = [await user.prepare(details=True) for user in users]
return web.json_response(payload)
@routes.view('/users/{user_id}')
@routes.view('/users/{user_id}/', name='user')
class UserView(web.View):
async def resolve_user(self):
request = self.request
user_id = request.match_info['user_id']
user = await User.fetch_from_id(request.app, int(user_id))
if user is None:
raise web.HTTPNotFound(text="No user exists with the given ID.")
return user
async def get(self):
user = await self.resolve_user()
logger.info(f"Received GET for user {user=}")
payload = await user.prepare(details=True)
return web.json_response(payload)
async def patch(self):
user = await self.resolve_user()
params = await self.request.json()
edit_data = {}
for key, value in params.items():
if key not in edit_fields:
raise web.HTTPBadRequest(text=f"You cannot update field '{key}' of User!")
edit_data[key] = value
for key in edit_fields:
if key in self.request:
edit_data.setdefault(key, self.request[key])
logger.info(f"Received PATCH for user {user} with params: {params}")
await user.edit(**edit_data)
payload = await user.prepare(details=True)
return web.json_response(payload)
async def delete(self):
user = await self.resolve_user()
logger.info(f"Received DELETE for user {user}")
payload = await user.delete()
return web.json_response(payload)
@routes.route('*', "/users/{user_id}{tail:/events}")
@routes.route('*', "/users/{user_id}{tail:/events/.*}")
@routes.route('*', "/users/{user_id}{tail:/transactions}")
@routes.route('*', "/users/{user_id}{tail:/transactions/.*}")
@routes.route('*', "/users/{user_id}{tail:/specimens}")
@routes.route('*', "/users/{user_id}{tail:/specimens/.*}")
async def user_prefix_routes(request: web.Request):
user_id = int(request.match_info['user_id'])
user = await User.fetch_from_id(request.app, user_id)
if user is None:
raise web.HTTPNotFound(text="No user exists with the given ID.")
new_path = request.match_info['tail']
logger.info(f"Redirecting {request=} to {new_path=} and setting {user_id=}")
new_request = request.clone(rel_url=new_path)
new_request['user_id'] = user_id
match_info = await request.app.router.resolve(new_request)
new_request._match_info = match_info
match_info.current_app = request.app
if match_info.handler:
return await match_info.handler(new_request)
else:
logger.info(f"Could not find handler matching {new_request}")
raise web.HTTPNotFound()
@routes.route('*', "/users/{user_id}/specimen")
@routes.route('*', "/users/{user_id}/specimen{tail:/.*}")
async def user_specimen_route(request: web.Request):
user_id = int(request.match_info['user_id'])
user = await User.fetch_from_id(request.app, user_id)
if user is None:
raise web.HTTPNotFound(text="No user exists with the given ID.")
tail = request.match_info.get('tail', '')
specimen = await user.get_specimen()
if request.method == 'POST' and not tail.strip('/'):
if specimen is None:
# Redirect to POST /specimens
# TODO: Would be nicer to use named handler here
new_path = '/specimens'
logger.info(f"Redirecting {request=} to POST /specimens")
new_request = request.clone(rel_url=new_path)
new_request['user_id'] = user_id
new_request['owner_id'] = user_id
match_info = await request.app.router.resolve(new_request)
new_request._match_info = match_info
match_info.current_app = request.app
return await match_info.handler(new_request)
else:
raise web.HTTPBadRequest(text="This user already has an active specimen!")
elif specimen is None:
raise web.HTTPNotFound(text="This user has no active specimen.")
else:
specimen_id = specimen.row.specimen_id
# Redirect to POST /specimens/{specimen_id}/...
new_path = f"/specimens/{specimen_id}".format(specimen_id=specimen_id) + tail
logger.info(f"Redirecting {request=} to {new_path}")
new_request = request.clone(rel_url=new_path)
new_request['user_id'] = user_id
new_request['owner_id'] = user_id
new_request['specimen_id'] = specimen_id
match_info = await request.app.router.resolve(new_request)
new_request._match_info = match_info
match_info.current_app = request.app
if match_info.handler:
return await match_info.handler(new_request)
else:
logger.info(f"Could not find handler matching {new_request}")
raise web.HTTPNotFound()
@routes.route('GET', "/users/{user_id}/wallet")
@routes.route('GET', "/users/{user_id}/wallet/")
async def user_wallet_route(request: web.Request):
user_id = int(request.match_info['user_id'])
user = await User.fetch_from_id(request.app, user_id)
if user is None:
raise web.HTTPNotFound(text="No user exists with the given ID.")
wallet = await user.get_wallet()
return web.json_response(wallet)