Compare commits

...

2 Commits

Author SHA1 Message Date
77dc90cc32 feat(api): Initial API server and stamps routes. 2025-06-07 05:29:00 +10:00
a02cc0977a (document): Add created ts and file format. 2025-06-07 05:27:59 +10:00
7 changed files with 659 additions and 4 deletions

View File

@@ -189,9 +189,10 @@ CREATE TABLE stamp_types (
CREATE TABLE documents (
document_id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
document_data VARCHAR NOT NULL,
document_data TEXT NOT NULL,
seal INTEGER NOT NULL,
metadata TEXT
metadata TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE document_stamps (

87
src/api.py Normal file
View File

@@ -0,0 +1,87 @@
import sys
import os
import asyncio
from aiohttp import web
import logging
from meta import conf
from data import Database
from utils.auth import key_auth_factory
from datamodels import DataModel
from constants import DATA_VERSION
from routes.stamps import routes as stamp_routes
from routes.lib import dbvar, datamodelsv
sys.path.insert(0, os.path.join(os.getcwd()))
sys.path.insert(0, os.path.join(os.getcwd(), "src"))
logger = logging.getLogger(__name__)
"""
- `/stamps` with `POST`, `PUT`, `GET`
- `/stamps/{stamp_id}` with `GET`, `PATCH`, `DELETE`
- `/documents` with `POST, GET`
- `/documents/{document_id}` with `GET`, `PATCH`, `DELETE`
- `/documents/{document_id}/stamps` which is passed to `/stamps` with `document_id` set.
- `/events` with `POST`, `GET`
- `/events/{event_id}` with `GET`, `PATCH`, `DELETE`
- `/events/{event_id}/document` which is passed to `/documents/{document_id}`
- `/events/{event_id}/user` which is passed to `/users/{user_id}`
- `/users` with `POST`, `GET`, `PATCH`, `DELETE`
- `/users/{user_id}` with `GET`, `PATCH`, `DELETE`
- `/users/{user_id}/events` which is passed to `/events`
- `/users/{user_id}/specimen` which is passed to `/specimens/{specimen_id}`
- `/users/{user_id}/specimens` which is passed to `/specimens`
- `/users/{user_id}/wallet` with `GET`
- `/users/{user_id}/transactions` which is passed to `/transactions`
- `/specimens` with `GET` and `POST`
- `/specimens/{specimen_id}` with `PATCH` and `DELETE`
- `/specimens/{specimen_id}/owner` which is passed to `/users/{user_id}`
- `/transactions` with `POST`, `GET`
- `/transactions/{transaction_id}` with `GET`, `PATCH`, `DELETE`
- `/transactions/{transaction_id}/user` which is passed to `/users/{user_id}`
"""
async def attach_db(app: web.Application):
db = Database(conf.data['args'])
async with db.open():
version = await db.version()
if version.version != DATA_VERSION:
error = f"Data model version is {version}, required version is {DATA_VERSION}! Please migrate."
logger.critical(error)
raise RuntimeError(error)
datamodel = DataModel()
db.load_registry(datamodel)
await datamodel.init()
app[dbvar] = db
app[datamodelsv] = datamodel
yield
async def test(request: web.Request) -> web.Response:
return web.Response(text="Hello World")
async def app_factory():
auth = key_auth_factory(conf.API['TOKEN'])
app = web.Application(middlewares=[auth])
app.cleanup_ctx.append(attach_db)
app.router.add_get('/', test)
app.router.add_routes(stamp_routes)
return app
async def run_app():
app = await app_factory()
web.run_app(app)
if __name__ == '__main__':
asyncio.run(run_app())

View File

@@ -103,9 +103,10 @@ class DataModel(Registry):
------
CREATE TABLE documents (
document_id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
document_data VARCHAR NOT NULL,
document_data TEXT NOT NULL,
seal INTEGER NOT NULL,
metadata TEXT
metadata TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""
_tablename_ = 'documents'
@@ -115,6 +116,7 @@ class DataModel(Registry):
document_data = Column()
seal = Integer()
metadata = String()
created_at = Timestamp()
class DocumentStamp(RowModel):
"""

231
src/routes/documents.py Normal file
View File

@@ -0,0 +1,231 @@
"""
- `/documents` with `POST, GET`
- `/documents/{document_id}` with `GET`, `PATCH`, `DELETE`
- `/documents/{document_id}/stamps` which is passed to `/stamps` with `document_id` set.
"""
from datetime import datetime
from typing import Any, NamedTuple, Optional, Self, TypedDict, Unpack, reveal_type, List
from aiohttp import web
from data import Condition, condition
from data.queries import JOINTYPE
from datamodels import DataModel
from .lib import ModelField, datamodelsv
from .stamps import Stamp, StampCreateParams, StampEditParams, StampPayload
routes = web.RouteTableDef()
class DocPayload(TypedDict):
document_id: int
document_data: str
seal: int
created_at: str
metadata: Optional[str]
stamps: List[StampPayload]
class DocCreateParamsReq(TypedDict, total=True):
document_data: str
seal: int
class DocCreateParams(DocCreateParamsReq, total=False):
metadata: Optional[str]
stamps: List[StampCreateParams]
class DocEditParams(TypedDict, total=False):
document_data: str
seal: int
metadata: Optional[str]
stamps: List[StampCreateParams]
fields = [
ModelField('document_id', int, False, False, False),
ModelField('document_data', str, True, True, True),
ModelField('seal', int, True, True, True),
ModelField('created_at', str, False, False, False),
ModelField('metadata', Optional[str], False, True, True),
ModelField('stamps', List[StampCreateParams], False, True, True),
]
req_fields = {field.name for field in fields if field.required}
edit_fields = {field.name for field in fields if field.can_edit}
create_fields = {field.name for field in fields if field.can_create}
class Document:
def __init__(self, app: web.Application, row: DataModel.Document):
self.app = app
self.data = app[datamodelsv]
self.row = row
@classmethod
async def fetch_from_id(cls, app: web.Application, document_id: int) -> Optional[Self]:
data = app[datamodelsv]
row = await data.Document.fetch(document_id)
return cls(app, row) if row is not None else None
@classmethod
async def query(
cls,
app: web.Application,
document_id: Optional[int] = None,
seal: Optional[int] = None,
created_before: Optional[str] = None,
created_after: Optional[str] = None,
metadata: Optional[str] = None,
stamp_type: Optional[str] = None,
) -> List[Self]:
data = app[datamodelsv]
Doc = data.Document
conds = []
if document_id is not None:
conds.append(Doc.document_id == document_id)
if seal is not None:
conds.append(Doc.seal == seal)
if created_before is not None:
cbefore = datetime.fromisoformat(created_before)
conds.append(Doc.created_at <= cbefore)
if created_after is not None:
cafter = datetime.fromisoformat(created_after)
conds.append(Doc.created_at >= cafter)
if metadata is not None:
conds.append(Doc.metadata == metadata)
query = data.Document.table.fetch_rows_where(*conds)
results = await query
query = data.Document.table.select_where(*conds)
if stamp_type is not None:
query.join('document_stamps', using=('document_id',), join_type=JOINTYPE.LEFT)
query.join(
'stamp_types',
on=(data.DocumentStamp.stamp_type == data.StampType.stamp_type_id)
)
query.where(data.StampType.stamp_type_name == stamp_type)
query.select(docid = "DISTINCT(document_id)")
query.with_no_adapter()
results = await query
ids = [result['docid'] for result in results]
if ids:
rows = await data.Document.table.fetch_rows_where(
document_id=ids
)
else:
rows = []
else:
rows = await query
return [cls(app, row) for row in rows]
@classmethod
async def create(cls, app: web.Application, **kwargs: Unpack[DocCreateParams]) -> Self:
data = app[datamodelsv]
document_data = kwargs['document_data']
seal = kwargs['seal']
stamp_params = kwargs.get('stamps', [])
metadata = kwargs.get('metadata')
# Build the document first
row = await data.Document.create(
document_data=document_data,
seal=seal,
metadata=metadata,
)
# Then build the stamps
stamps = []
for stampdata in stamp_params:
stampdata.setdefault('document_id', row.document_id)
stamp = await Stamp.create(app, **stampdata)
stamps.append(stamp)
return cls(app, row)
async def get_stamps(self) -> List[Stamp]:
stamprows = await self.data.DocumentStamp.table.fetch_rows_where(document_id=self.row.document_id)
return [Stamp(self.app, row) for row in stamprows]
async def prepare(self) -> DocPayload:
stamps = await self.get_stamps()
results: DocPayload = {
'document_id': self.row.document_id,
'document_data': self.row.document_data,
'seal': self.row.seal,
'created_at': self.row.created_at.isoformat(),
'metadata': self.row.metadata,
'stamps': [await stamp.prepare() for stamp in stamps]
}
return results
async def edit(self, **kwargs: Unpack[DocEditParams]):
data = self.data
row = self.row
# Update the row data
# If stamps are given, delete the existing ones
# Then write in the new ones.
update_args = {}
for key in {'document_data', 'seal', 'metadata'}:
if key in kwargs:
update_args[key] = kwargs[key]
if update_args:
await self.row.update(**update_args)
# TODO: Should really be in a transaction
# Actually each handler should be in a transaction
if new_stamps := kwargs.get('stamps', []):
await self.data.DocumentStamp.table.delete_where(document_id=self.row.document_id)
for stampdata in new_stamps:
stampdata.setdefault('document_id', row.document_id)
await Stamp.create(self.app, **stampdata)
async def delete(self) -> DocPayload:
payload = await self.prepare()
await self.row.delete()
return payload
@routes.view('/documents')
class DocumentsView(web.View):
async def get(self):
request = self.request
filter_params = {}
keys = [
'document_id',
'seal',
'created_before',
'created_after',
'metadata',
'stamp_type',
]
for key in keys:
if key in request.query:
filter_params[key] = request.query[key]
elif key in request:
filter_params[key] = request[key]
documents = await Document.query(request.app, **filter_params)
payload = [await doc.prepare() for doc in documents]
return web.json_response(payload)
async def post(self):
request = self.request
params = await request.json()
for key in create_fields:
if key in request:
params.setdefault(key, request[key])
if extra := next((key for key in params if key not in create_fields), None):
raise web.HTTPBadRequest(text=f"Invalid key '{extra}' passed to document creation.")
if missing := next((key for key in req_fields if key not in params), None):
raise web.HTTPBadRequest(text=f"Document params missing required key '{missing}'")
document = await Document.create(self.request.app, **params)
payload = await document.prepare()
return web.json_response(payload)

47
src/routes/lib.py Normal file
View File

@@ -0,0 +1,47 @@
from typing import NamedTuple, Any, Optional, Self, Unpack, List, TypedDict
from aiohttp import web
from data.database import Database
from datamodels import DataModel
dbvar = web.AppKey("database", Database)
datamodelsv = web.AppKey("datamodels", DataModel)
class ModelField(NamedTuple):
name: str
typ: Any
required: bool
can_create: bool
can_edit: bool
class ModelClassABC[RowT, Payload: TypedDict, CreateParams: TypedDict, EditParams: TypedDict]:
def __init__(self, app: web.Application, row: RowT):
self.app = app
self.data = app[datamodelsv]
self.row = row
@classmethod
async def fetch_from_id(cls, app: web.Application, document_id: int) -> Optional[Self]:
...
@classmethod
async def query(
cls,
**kwargs
) -> List[Self]:
...
@classmethod
async def create(cls, app: web.Application, **kwargs: Unpack[CreateParams]) -> Self:
...
async def prepare(self) -> Payload:
...
async def edit(self, **kwargs: Unpack[EditParams]):
...
async def delete(self) -> Payload:
...

260
src/routes/stamps.py Normal file
View File

@@ -0,0 +1,260 @@
from typing import Any, NamedTuple, Optional, TypedDict, Unpack, reveal_type
from aiohttp import web
from data.database import Database
from datamodels import DataModel
from .lib import datamodelsv, ModelField
routes = web.RouteTableDef()
class StampPayload(TypedDict):
stamp_id: int
document_id: int
stamp_type: str
pos_x: int
pos_y: int
rotation: float
class StampCreateParamsReq(TypedDict, total=True):
document_id: int
stamp_type: str
pos_x: int
pos_y: int
rotation: float
class StampCreateParams(StampCreateParamsReq, total=False):
pass
class StampEditParams(TypedDict, total=False):
document_id: int
stamp_type: str
pos_x: int
pos_y: int
rotation: float
fields = [
ModelField('stamp_id', int, False, False, False),
ModelField('document_id', int, True, True, True),
ModelField('stamp_type', int, True, True, True),
ModelField('pos_x', int, True, True, True),
ModelField('pos_y', int, True, True, True),
ModelField('rotation', float, True, True, True),
]
req_fields = {field.name for field in fields if field.required}
edit_fields = {field.name for field in fields if field.can_edit}
create_fields = {field.name for field in fields if field.can_create}
class Stamp:
def __init__(self, app: web.Application, row: DataModel.DocumentStamp):
self.app = app
self.data = app[datamodelsv]
self.row = row
@classmethod
async def fetch_from_id(cls, app: web.Application, stamp_id: int) -> Optional['Stamp']:
stamp = await app[datamodelsv].DocumentStamp.fetch(stamp_id)
if stamp is None:
return None
return cls(app, stamp)
@classmethod
async def query(
cls,
app: web.Application,
stamp_id: Optional[int] = None,
document_id: Optional[int] = None,
stamp_type: Optional[str] = None,
):
data = app[datamodelsv]
query_args = {}
if stamp_id is not None:
query_args['stamp_id'] = stamp_id
if document_id is not None:
query_args['document_id'] = document_id
if stamp_type is not None:
typerows = await data.StampType.table.fetch_rows_where(stamp_type_name=stamp_type)
typeids = [row.stamp_type_id for row in typerows]
if not typeids:
return []
query_args['stamp_type'] = typeids
results = await data.DocumentStamp.table.fetch_rows_where(**query_args)
return [cls(app, row) for row in results]
@classmethod
async def create(
cls,
app: web.Application,
**kwargs: Unpack[StampCreateParams]
):
data = app[datamodelsv]
stamp_type = kwargs['stamp_type']
# Get the stamp_type
rows = await data.StampType.table.fetch_rows_where(stamp_type_name=stamp_type)
if not rows:
# Create the stamp type
row = await data.StampType.create(stamp_type_name=stamp_type)
else:
row = rows[0]
stamprow = await data.DocumentStamp.create(
document_id=kwargs['document_id'],
stamp_type=row.stamp_type_id,
position_x=int(kwargs['pos_x']),
position_y=int(kwargs['pos_y']),
rotation=float(kwargs['rotation'])
)
return cls(app, stamprow)
async def prepare(self) -> StampPayload:
typerow = await self.data.StampType.fetch(self.row.stamp_type)
assert typerow is not None
results: StampPayload = {
'stamp_id': self.row.stamp_id,
'document_id': self.row.document_id,
'stamp_type': typerow.stamp_type_name,
'pos_x': self.row.position_x,
'pos_y': self.row.position_y,
'rotation': self.row.rotation,
}
return results
async def edit(
self,
**kwargs: Unpack[StampEditParams]
):
data = self.data
row = self.row
edit_args = {}
if stamp_type := kwargs.get('stamp_type'):
# Get the stamp_type
rows = await data.StampType.table.fetch_rows_where(stamp_type_name=stamp_type)
if not rows:
# Create the stamp type
row = await data.StampType.create(stamp_type_name=stamp_type)
else:
row = rows[0]
edit_args['stamp_type'] = row.stamp_type_id
simple_keys = {
'document_id': 'document_id',
'pos_x': 'position_x',
'pos_y': 'position_y',
'rotation': 'rotation'
}
for editkey, datakey in simple_keys.values():
if editkey in kwargs:
edit_args[datakey] = kwargs[editkey]
await self.row.update(
**edit_args
)
async def delete(self) -> StampPayload:
payload = await self.prepare()
await self.row.delete()
return payload
@routes.view('/stamps')
class StampsView(web.View):
async def get(self):
request = self.request
# Decode request parameters to filter args
filter_params = {}
keys = ['stamp_id', 'document_id', 'stamp_type']
for key in keys:
if key in request.query:
filter_params[key] = request.query[key]
elif key in request:
filter_params[key] = request[key]
stamps = await Stamp.query(request.app, **filter_params)
payload = [await stamp.prepare() for stamp in stamps]
return web.json_response(payload)
async def create_one(self, params: StampCreateParams):
if extra := next((key for key in params if key not in create_fields), None):
raise web.HTTPBadRequest(text=f"Invalid key '{extra}' passed to stamp creation.")
if missing := next((key for key in req_fields if key not in params), None):
raise web.HTTPBadRequest(text=f"Stamp params missing required key '{missing}'.")
# This still doesn't guarantee that the values are of the correct type, but good enough.
stamp = await Stamp.create(self.request.app, **params)
return stamp
async def post(self):
request = self.request
params = await request.json()
for key in create_fields:
if key in request:
params.setdefault(key, request[key])
stamp = await self.create_one(params)
payload = await stamp.prepare()
return web.json_response(payload)
async def put(self):
request = self.request
from_request = {key: request[key] for key in create_fields if key in request}
argslist = await request.json()
payloads = []
for args in argslist:
stamp = await self.create_one(from_request | args)
payload = await stamp.prepare()
payloads.append(payload)
return web.json_response(payloads)
@routes.view('/stamps/{stamp_id}')
class StampView(web.View):
async def resolve_stamp(self):
request = self.request
stamp_id = request.match_info['stamp_id']
stamp = await Stamp.fetch_from_id(request.app, int(stamp_id))
if stamp is None:
raise web.HTTPNotFound(text="No stamp exists with the given ID.")
return stamp
async def get(self):
stamp = await self.resolve_stamp()
payload = await stamp.prepare()
return web.json_response(payload)
async def patch(self):
stamp = await self.resolve_stamp()
params = await self.request.json()
edit_data = {}
for key, value in params.items():
if key not in edit_fields:
raise web.HTTPBadRequest(text=f"You cannot update field '{key}' of Stamp!")
edit_data[key] = value
for key in edit_fields:
if key in self.request:
edit_data.setdefault(key, self.request[key])
await stamp.edit(**edit_data)
payload = await stamp.prepare()
return web.json_response(payload)
async def delete(self):
stamp = await self.resolve_stamp()
payload = await stamp.delete()
return web.json_response(payload)

27
src/utils/auth.py Normal file
View File

@@ -0,0 +1,27 @@
from typing import Awaitable, Callable
from aiohttp import hdrs, web
CHALLENGE = web.Response(
body="<b> 401 UNAUTHORIZED </b>",
status=401,
reason='UNAUTHORIZED',
headers={
hdrs.WWW_AUTHENTICATE: 'X-API-KEY',
hdrs.CONTENT_TYPE: 'text/html; charset=utf-8',
hdrs.CONNECTION: 'close',
},
)
def key_auth_factory(required_token: str):
"""
Creates an aiohttp middleware that ensures
the `required_token` is provided in the `X-API-KEY` header.
"""
@web.middleware
async def key_auth_middleware(request: web.Request, handler: Callable[[web.Request], Awaitable[web.StreamResponse]]) -> web.StreamResponse:
auth_header = request.headers.get('X-API-KEY')
if not auth_header or auth_header.strip() != required_token:
return CHALLENGE
else:
return await handler(request)
return key_auth_middleware