Files
adventures/src/routes/documents.py

303 lines
10 KiB
Python

"""
- `/documents` with `POST, GET`
- `/documents/{document_id}` with `GET`, `PATCH`, `DELETE`
- `/documents/{document_id}/stamps` which is passed to `/stamps` with `document_id` set.
"""
import logging
from datetime import datetime
from typing import Any, NamedTuple, Optional, Self, TypedDict, Unpack, reveal_type, List
from aiohttp import web
from data import Condition, condition
from data.queries import JOINTYPE
from datamodels import DataModel
from .lib import ModelField, datamodelsv
from .stamps import Stamp, StampCreateParams, StampEditParams, StampPayload
routes = web.RouteTableDef()
logger = logging.getLogger(__name__)
class DocPayload(TypedDict):
document_id: int
document_data: str
seal: int
created_at: str
metadata: Optional[str]
stamps: List[StampPayload]
class DocCreateParamsReq(TypedDict, total=True):
document_data: str
seal: int
class DocCreateParams(DocCreateParamsReq, total=False):
metadata: Optional[str]
stamps: List[StampCreateParams]
class DocEditParams(TypedDict, total=False):
document_data: str
seal: int
metadata: Optional[str]
stamps: List[StampCreateParams]
fields = [
ModelField('document_id', int, False, False, False),
ModelField('document_data', str, True, True, True),
ModelField('seal', int, True, True, True),
ModelField('created_at', str, False, False, False),
ModelField('metadata', Optional[str], False, True, True),
ModelField('stamps', List[StampCreateParams], False, True, True),
]
req_fields = {field.name for field in fields if field.required}
edit_fields = {field.name for field in fields if field.can_edit}
create_fields = {field.name for field in fields if field.can_create}
class Document:
def __init__(self, app: web.Application, row: DataModel.Document):
self.app = app
self.data = app[datamodelsv]
self.row = row
@classmethod
async def validate_create_params(cls, params):
if extra := next((key for key in params if key not in create_fields), None):
raise web.HTTPBadRequest(text=f"Invalid key '{extra}' passed to document creation.")
if missing := next((key for key in req_fields if key not in params), None):
raise web.HTTPBadRequest(text=f"Document params missing required key '{missing}'")
@classmethod
async def fetch_from_id(cls, app: web.Application, document_id: int) -> Optional[Self]:
data = app[datamodelsv]
row = await data.Document.fetch(document_id)
return cls(app, row) if row is not None else None
@classmethod
async def query(
cls,
app: web.Application,
document_id: Optional[int] = None,
seal: Optional[int] = None,
created_before: Optional[str] = None,
created_after: Optional[str] = None,
metadata: Optional[str] = None,
stamp_type: Optional[str] = None,
) -> List[Self]:
data = app[datamodelsv]
Doc = data.Document
conds = []
if document_id is not None:
conds.append(Doc.document_id == int(document_id))
if seal is not None:
conds.append(Doc.seal == int(seal))
if created_before is not None:
cbefore = datetime.fromisoformat(created_before)
conds.append(Doc.created_at <= cbefore)
if created_after is not None:
cafter = datetime.fromisoformat(created_after)
conds.append(Doc.created_at >= cafter)
if metadata is not None:
conds.append(Doc.metadata == metadata)
query = data.Document.table.fetch_rows_where(*conds)
# results = await query
# query = data.Document.table.select_where(*conds)
if stamp_type is not None:
query.join('document_stamps', using=('document_id',), join_type=JOINTYPE.LEFT)
query.join(
'stamp_types',
on=(data.DocumentStamp.stamp_type == data.StampType.stamp_type_id)
)
query.where(data.StampType.stamp_type_name == stamp_type)
query.select(docid = "DISTINCT(document_id)")
query.with_no_adapter()
results = await query
ids = [result['docid'] for result in results]
if ids:
rows = await data.Document.table.fetch_rows_where(
document_id=ids
)
else:
rows = []
else:
rows = await query
return [cls(app, row) for row in sorted(rows, key=lambda row:row.created_at)]
@classmethod
async def create(cls, app: web.Application, **kwargs: Unpack[DocCreateParams]) -> Self:
data = app[datamodelsv]
document_data = kwargs['document_data']
seal = kwargs['seal']
stamp_params = kwargs.get('stamps', [])
metadata = kwargs.get('metadata')
# Build the document first
row = await data.Document.create(
document_data=document_data,
seal=seal,
metadata=metadata,
)
# Then build the stamps
stamps = []
for stampdata in stamp_params:
stampdata.setdefault('document_id', row.document_id)
stamp = await Stamp.create(app, **stampdata)
stamps.append(stamp)
return cls(app, row)
async def get_stamps(self) -> List[Stamp]:
stamprows = await self.data.DocumentStamp.table.fetch_rows_where(document_id=self.row.document_id).order_by('stamp_id')
return [Stamp(self.app, row) for row in stamprows]
async def prepare(self) -> DocPayload:
stamps = await self.get_stamps()
results: DocPayload = {
'document_id': self.row.document_id,
'document_data': self.row.document_data,
'seal': self.row.seal,
'created_at': self.row.created_at.isoformat(),
'metadata': self.row.metadata,
'stamps': [await stamp.prepare() for stamp in stamps]
}
return results
async def edit(self, **kwargs: Unpack[DocEditParams]):
data = self.data
row = self.row
# Update the row data
# If stamps are given, delete the existing ones
# Then write in the new ones.
update_args = {}
for key in {'document_data', 'seal', 'metadata'}:
if key in kwargs:
update_args[key] = kwargs[key]
if update_args:
await self.row.update(**update_args)
# TODO: Should really be in a transaction
# Actually each handler should be in a transaction
if new_stamps := kwargs.get('stamps', []):
await self.data.DocumentStamp.table.delete_where(document_id=self.row.document_id)
for stampdata in new_stamps:
stampdata.setdefault('document_id', row.document_id)
await Stamp.create(self.app, **stampdata)
async def delete(self) -> DocPayload:
payload = await self.prepare()
await self.row.delete()
return payload
@routes.view('/documents')
@routes.view('/documents/')
class DocumentsView(web.View):
async def get(self):
request = self.request
filter_params = {}
keys = [
'document_id',
'seal',
'created_before',
'created_after',
'metadata',
'stamp_type',
]
for key in keys:
if key in request.query:
filter_params[key] = request.query[key]
elif key in request:
filter_params[key] = request[key]
documents = await Document.query(request.app, **filter_params)
payload = [await doc.prepare() for doc in documents]
return web.json_response(payload)
async def post(self):
request = self.request
params = await request.json()
for key in create_fields:
if key in request:
params.setdefault(key, request[key])
await Document.validate_create_params(params)
document = await Document.create(self.request.app, **params)
payload = await document.prepare()
return web.json_response(payload)
@routes.view('/documents/{document_id}')
@routes.view('/documents/{document_id}/')
class DocumentView(web.View):
async def resolve_document(self):
request = self.request
document_id = request.match_info['document_id']
document = await Document.fetch_from_id(request.app, int(document_id))
if document is None:
raise web.HTTPNotFound(text="No document exists with the given ID.")
return document
async def get(self):
doc = await self.resolve_document()
payload = await doc.prepare()
return web.json_response(payload)
async def patch(self):
doc = await self.resolve_document()
params = await self.request.json()
edit_data = {}
for key, value in params.items():
if key not in edit_fields:
raise web.HTTPBadRequest(text=f"You cannot update field '{key}' of Document!")
edit_data[key] = value
for key in edit_fields:
if key in self.request:
edit_data.setdefault(key, self.request[key])
await doc.edit(**edit_data)
payload = await doc.prepare()
return web.json_response(payload)
async def delete(self):
doc = await self.resolve_document()
payload = await doc.delete()
return web.json_response(payload)
# We have one prefix route, /documents/{document_id}/stamps
@routes.route('*', "/documents/{document_id}{tail:/stamps}")
@routes.route('*', "/documents/{document_id}{tail:/stamps/.*}")
async def document_stamps_route(request: web.Request):
document_id = int(request.match_info['document_id'])
document = await Document.fetch_from_id(request.app, document_id)
if document is None:
raise web.HTTPNotFound(text="No document exists with the given ID.")
new_path = request.match_info['tail']
logger.info(f"Redirecting {request=} to {new_path=} and setting {document_id=}")
new_request = request.clone(rel_url=new_path)
new_request['document_id'] = document_id
match_info = await request.app.router.resolve(new_request)
match_info.current_app = request.app
new_request._match_info = match_info
if match_info.handler:
return await match_info.handler(new_request)
else:
raise web.HTTPNotFound()