Initial Demo client and flows.

This commit is contained in:
2025-06-11 19:42:22 +10:00
commit 0d831d10f4
5 changed files with 904 additions and 0 deletions

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
aiohttp
pillow

590
src/client.py Normal file
View File

@@ -0,0 +1,590 @@
import json
import logging
import asyncio
from datetime import datetime
from typing import NamedTuple, Optional
from aiohttp import ClientSession
logger = logging.getLogger(__name__)
class HTTPException(Exception):
def __init__(self, response, message):
self.response = response
self.stats = response.status
error = "{} {}: {}".format(response.status, response.reason, message)
super().__init__(error)
class Route(NamedTuple):
path: str
method: str
params: dict[str, int|str]
url: str
def url_for(self):
return self.url.format(**self.params)
class Router:
def __init__(self, base_url: str):
self.base_url = base_url
def route(self, method, path, **params) -> Route:
url = self.base_url + path
route = Route(
method=method,
path=path,
params=params,
url=url,
)
return route
class DemoClient:
user_agent = "Dreamspace Demo API client written by Interitio (cona@thewisewolf.dev)"
def __init__(self, base_url: str, apikey: str, session: ClientSession):
self.router = Router(base_url)
self.apikey = apikey
self.session = session
async def request(self, route: Route, payload=None, query=None, **kwargs):
if self.session.closed:
raise ValueError("Session is closed or not started.")
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": self.user_agent,
"X-API-KEY": self.apikey,
}
data = json.dumps(payload) if payload is not None else None
params = query
logger.info(f"Sending {route.method} to '{route.url_for()}' with {params=} and {data=}")
async with self.session.request(route.method, route.url_for(), headers=headers, data=data, params=params) as resp:
text = await resp.text(encoding='utf-8')
logger.debug(f"{route.url} response {resp.status}: {text}")
if 300 > resp.status >= 200:
return json.loads(text)
else:
raise HTTPException(resp, text)
# --------------------
# Users Chapter
# --------------------
async def get_users(
self,
user_id: Optional[int]=None,
twitch_id: Optional[str]=None,
name: Optional[str]=None,
created_after: Optional[datetime]=None,
created_before: Optional[datetime]=None,
):
query = {
'user_id': str(user_id) if user_id else None,
'twitch_id': twitch_id,
'name': name,
'created_after': created_after.isoformat() if created_after else None,
'created_before': created_before.isoformat() if created_before else None,
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/users')
return await self.request(route, query=query)
async def create_user(
self,
twitch_id: str,
name: str,
preferences: Optional[str] = None
):
payload = {'twitch_id': twitch_id, 'name': name}
if preferences is not None:
payload['preferences'] = preferences
route = self.router.route('POST', '/users')
return await self.request(route, payload=payload)
async def get_user(self, user_id: int):
route = self.router.route('GET', '/users/{user_id}', user_id=user_id)
return await self.request(route)
async def update_user(self, user_id: int, name: Optional[str] = None, preferences: Optional[str] = None):
payload = {}
if name is not None:
payload['name'] = name
if preferences is not None:
payload['preferences'] = preferences
route = self.router.route('PATCH', '/users/{user_id}', user_id=user_id)
return await self.request(route)
async def delete_user(self, user_id: int):
route = self.router.route('DELETE', '/users/{user_id}', user_id=user_id)
return await self.request(route)
async def get_user_events(self, user_id: int):
# This supports the same filters as get_events, but we don't implement those here
# We could also implement create_user_event with subtypes, but we don't
route = self.router.route('GET', '/users/{user_id}/events', user_id=user_id)
return await self.request(route)
async def get_user_specimen(self, user_id: int):
route = self.router.route('GET', '/users/{user_id}/specimen', user_id=user_id)
return await self.request(route)
async def create_user_specimen(
self, user_id: int,
born_at: Optional[datetime] = None, forgotten_at: Optional[datetime] = None
):
payload = {}
if born_at is not None:
payload['born_at'] = born_at.isoformat()
if forgotten_at is not None:
payload['forgotten_at'] = forgotten_at.isoformat()
route = self.router.route('POST', '/users/{user_id}/specimen', user_id=user_id)
return await self.request(route, payload=payload)
async def get_user_transactions(self, user_id: int):
route = self.router.route('GET', '/users/{user_id}/transactions', user_id=user_id)
return await self.request(route)
# --------------------
# Specimens Chapter
# --------------------
async def get_specimens(
self,
specimen_id: Optional[int] = None,
owner_id: Optional[int] = None,
born_after: Optional[datetime]=None,
born_before: Optional[datetime]=None,
forgotten: Optional[bool]=None,
forgotten_after: Optional[datetime]=None,
forgotten_before: Optional[datetime]=None,
):
query = {
'specimen_id': str(specimen_id) if specimen_id is not None else None,
'owner_id': str(owner_id) if owner_id is not None else None,
'born_after': born_after.isoformat() if born_after else None,
'born_before': born_before.isoformat() if born_before else None,
'forgotten': ('true' if forgotten else 'false') if forgotten is not None else None,
'forgotten_after': forgotten_after.isoformat() if forgotten_after else None,
'forgotten_before': forgotten_before.isoformat() if forgotten_before else None,
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/specimens')
return await self.request(route, query=query)
async def create_specimen(
self,
owner_id: int,
born_at: Optional[datetime] = None,
forgotten_at: Optional[datetime] = None,
):
payload = {'owner_id': owner_id}
if born_at is not None:
payload['born_at'] = born_at.isoformat()
if forgotten_at is not None:
payload['forgotten_at'] = forgotten_at.isoformat()
route = self.router.route('POST', '/specimens')
return await self.request(route, payload=payload)
async def create_specimen_stateless(
self,
owner: dict,
born_at: Optional[datetime] = None,
forgotten_at: Optional[datetime] = None,
):
payload = {'owner': owner}
if born_at is not None:
payload['born_at'] = born_at.isoformat()
if forgotten_at is not None:
payload['forgotten_at'] = forgotten_at.isoformat()
route = self.router.route('POST', '/specimens')
return await self.request(route, payload=payload)
async def get_specimen(self, specimen_id: int):
route = self.router.route('GET', '/specimens/{specimen_id}', specimen_id=specimen_id)
return await self.request(route)
async def update_specimen(
self,
specimen_id: int,
owner_id: Optional[int] = None,
forgotten_at: Optional[datetime] = None
):
payload = {}
if owner_id is not None:
payload['owner_id'] = owner_id
if forgotten_at is not None:
payload['forgotten_at'] = forgotten_at.isoformat()
route = self.router.route('PATCH', '/specimens/{specimen_id}', specimen_id=specimen_id)
return await self.request(route)
async def forget_specimen(self, specimen_id: int):
await self.update_specimen(specimen_id, forgotten_at=datetime.now())
async def delete_specimen(self, specimen_id: int):
route = self.router.route('DELETE', '/specimens/{specimen_id}', specimen_id=specimen_id)
return await self.request(route)
async def get_specimen_owner(self, specimen_id: int):
route = self.router.route('GET', '/specimens/{specimen_id}/owner', specimen_id=specimen_id)
return await self.request(route)
# --------------------
# Documents Chapter
# --------------------
async def get_documents(
self,
document_id: Optional[int] = None,
seal: Optional[int] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
metadata: Optional[str] = None,
stamp_type: Optional[str] = None,
):
query = {
'document_id': str(document_id) if document_id is not None else None,
'seal': str(seal) if seal is not None else None,
'created_after': created_after.isoformat() if created_after else None,
'created_before': created_before.isoformat() if created_before else None,
'metadata': metadata,
'stamp_type': stamp_type,
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/documents')
return await self.request(route, query=query)
async def create_document(
self,
document_data: str,
seal: int,
metadata: Optional[str] = None,
stamps: list[dict] = [],
):
payload = {
'document_data': document_data,
'seal': seal,
'metadata': metadata,
'stamps': stamps,
}
route = self.router.route('POST', '/documents')
return await self.request(route, payload=payload)
async def get_document(self, document_id: int):
route = self.router.route('GET', '/documents/{document_id}', document_id=document_id)
return await self.request(route)
async def update_document(
self,
document_id: int,
document_data: Optional[str] = None,
seal: Optional[int] = None,
metadata: Optional[str] = None,
stamps: list[dict] = [],
):
payload = {
'document_data': document_data,
'seal': str(seal) if seal is not None else None,
'metadata': metadata,
'stamps': stamps,
}
payload = {k: v for k, v in payload.items() if v}
route = self.router.route('PATCH', '/documents/{document_id}', document_id=document_id)
return await self.request(route, payload=payload)
async def delete_document(self, document_id: int):
route = self.router.route('DELETE', '/documents/{document_id}', document_id=document_id)
return await self.request(route)
async def get_document_stamps(self, document_id: int):
route = self.router.route('GET', '/documents/{document_id}/stamps', document_id=document_id)
return await self.request(route)
async def post_document_stamps(self, document_id: int, **stamp_args):
route = self.router.route('POST', '/documents/{document_id}/stamps', document_id=document_id)
return await self.request(route, payload=stamp_args)
async def put_document_stamps(self, document_id: int, *stamp_args):
route = self.router.route('PUT', '/documents/{document_id}/stamps', document_id=document_id)
return await self.request(route, payload=stamp_args)
# --------------------
# Stamps Chapter
# --------------------
async def get_stamps(
self,
stamp_id: Optional[int] = None,
document_id: Optional[int] = None,
stamp_type: Optional[str] = None,
):
query = {
'stamp_id': str(stamp_id) if stamp_id is not None else None,
'document_id': str(document_id) if document_id is not None else None,
'stamp_type': stamp_type
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/stamps')
return await self.request(route, query=query)
async def create_stamp(
self,
document_id: int,
stamp_type: str,
pos_x: int,
pos_y: int,
rotation: float,
):
payload = {
'document_id': document_id,
'stamp_type': stamp_type,
'pos_x': pos_x,
'pos_y': pos_y,
'rotation': rotation,
}
route = self.router.route('POST', '/stamps')
return await self.request(route, payload=payload)
async def create_stamps(self, *stamp_args):
route = self.router.route('PUT', '/stamps')
return await self.request(route, payload=stamp_args)
async def get_stamp(self, stamp_id: int):
route = self.router.route('GET', '/stamps/{stamp_id}', stamp_id=stamp_id)
return await self.request(route)
async def update_stamp(
self,
stamp_id: int,
document_id: Optional[int] = None,
stamp_type: Optional[str] = None,
pos_x: Optional[int] = None,
pos_y: Optional[int] = None,
rotation: Optional[float] = None,
):
payload = {
'document_id': document_id,
'stamp_type': stamp_type,
'pos_x': pos_x,
'pos_y': pos_y,
'rotation': rotation,
}
payload = {k: v for k, v in payload.items() if v is not None}
route = self.router.route('PATCH', '/stamps/{stamp_id}', stamp_id=stamp_id)
return await self.request(route, payload=payload)
async def delete_stamp(self, stamp_id: int):
route = self.router.route('DELETE', '/stamps/{stamp_id}', stamp_id=stamp_id)
return await self.request(route)
# --------------------
# Transactions Chapter
# --------------------
async def get_transactions(
self,
transaction_id: Optional[int] = None,
user_id: Optional[int] = None,
description: Optional[str] = None,
reference: Optional[str] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
):
query = {
'transaction_id': int(transaction_id) if transaction_id else None,
'user_id': int(user_id) if user_id else None,
'description': description,
'reference': reference,
'created_after': created_after.isoformat() if created_after else None,
'created_before': created_before.isoformat() if created_before else None,
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/transactions')
return await self.request(route, query=query)
async def create_transaction(
self,
user_id: int,
amount: int,
description: str,
reference: Optional[str] = None,
):
payload = {
'user_id': user_id,
'amount': amount,
'description': description,
}
if reference is not None:
payload['reference'] = reference
route = self.router.route('POST', '/transactions')
return await self.request(route, payload=payload)
async def get_transaction(self, transaction_id: int):
route = self.router.route('GET', '/transactions/{transaction_id}', transaction_id=transaction_id)
return await self.request(route)
async def get_transaction_user(self, transaction_id: int):
route = self.router.route('GET', '/transactions/{transaction_id}/user', transaction_id=transaction_id)
return await self.request(route)
# --------------------
# Events Chapter
# --------------------
async def get_events(
self,
event_id: Optional[int] = None,
document_id: Optional[int] = None,
document_seal: Optional[int] = None,
user_id: Optional[int] = None,
user_name: Optional[str] = None,
occurred_before: Optional[datetime] = None,
occurred_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
event_type: Optional[str] = None,
):
query = {
'event_id': int(event_id) if event_id else None,
'document_id': int(document_id) if document_id else None,
'document_seal': int(document_seal) if document_seal else None,
'user_id': int(user_id) if user_id else None,
'user_name': user_name,
'event_type': event_type,
'created_after': created_after.isoformat() if created_after else None,
'created_before': created_before.isoformat() if created_before else None,
'occurred_after': occurred_after.isoformat() if occurred_after else None,
'occurred_before': occurred_before.isoformat() if occurred_before else None,
}
query = {k: v for k, v in query.items() if v is not None}
route = self.router.route('GET', '/events')
return await self.request(route, query=query)
async def _create_event(
self,
**create_args
):
route = self.router.route('POST', '/events')
return await self.request(route, payload=create_args)
async def create_event(
self,
user_name: str,
occurred_at: datetime,
event_type: str,
document_id: Optional[int] = None,
document: Optional[dict] = None,
user_id: Optional[int] = None,
user: Optional[dict] = None,
**kwargs
):
if document_id is not None and document is not None:
raise ValueError("Cannot provide both document and document_id.")
if user_id is not None and user is not None:
raise ValueError("Cannot provide both user and user_id.")
if user_id is None and user is None:
raise ValueError("Must provide exactly one of user or user_id.")
args = {
'user_name': user_name,
'occurred_at': occurred_at.isoformat(),
'event_type': event_type,
'document_id': document_id,
'document': document,
'user_id': user_id,
'user': user,
} | kwargs
args = {k: v for k, v in args.items() if v is not None}
return await self._create_event(**args)
async def create_plain_event(
self, *,
message: str,
**kwargs
):
return await self.create_event(
event_type='plain',
message=message,
**kwargs
)
async def create_sub_event(
self, *,
tier: int,
subscribed_length: int,
message: Optional[str] = None,
**kwargs
):
return await self.create_event(
event_type='subscriber',
tier=tier,
subscribed_length=subscribed_length,
message=message,
**kwargs
)
async def create_cheer_event(
self, *,
amount: int,
cheer_type: Optional[str] = None,
message: Optional[str] = None,
**kwargs
):
return await self.create_event(
event_type='cheer',
amount=amount,
cheer_type=cheer_type,
message=message,
**kwargs
)
async def create_raid_event(
self, *,
viewer_count: int,
**kwargs
):
return await self.create_event(
event_type='raid',
viewer_count=viewer_count,
**kwargs
)
async def get_event(self, event_id: int):
route = self.router.route('GET', '/events/{event_id}', event_id=event_id)
return await self.request(route)
async def update_event(
self, event_id: int, **kwargs
):
# Haven't bothered to add the arguments explicitly here
route = self.router.route('PATCH', '/events/{event_id}', event_id=event_id)
return await self.request(route, payload=kwargs)
async def delete_event(self, event_id: int):
route = self.router.route('DELETE', '/events/{event_id}', event_id=event_id)
return await self.request(route)
async def get_event_document(self, event_id: int):
route = self.router.route('GET', '/events/{event_id}/document', event_id=event_id)
return await self.request(route)
async def get_event_user(self, event_id: int):
route = self.router.route('GET', '/events/{event_id}/user', event_id=event_id)
return await self.request(route)

40
src/demo.py Normal file
View File

@@ -0,0 +1,40 @@
import asyncio
import sys
import argparse
import logging
from typing import Optional
import aiohttp
from client import DemoClient
from demos import run_demos
parser = argparse.ArgumentParser()
parser.add_argument(
'--url',
dest='base_url',
default="https://dreams.thewisewolf.dev",
help="Base API URL to use."
)
parser.add_argument(
'--token',
dest='token',
help="API Token to use to authenticate with Dreamspace API"
)
logger = logging.getLogger()
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
async def main():
# Read url and auth token from args
args = parser.parse_args()
async with aiohttp.ClientSession() as session:
client = DemoClient(args.base_url, args.token, session=session)
await run_demos(client)
if __name__ == '__main__':
asyncio.run(main())

202
src/demos.py Normal file
View File

@@ -0,0 +1,202 @@
from datetime import datetime
import os
from random import randint
from client import DemoClient
from drawing import generate_sub_card, generate_plain_card, generate_cheer_card, generate_raid_card, stamp_card, load_image, serialise_image
user1 = {
"twitch_id": "105474571",
"name": "alllenski",
"preferences": "Hamburgers",
}
user2 = {
"twitch_id": "623539907",
"name": "holonomy",
"preferences": "cats",
}
async def spawn_specimen(client: DemoClient):
# Spawn a specimen
...
async def printer_redeem(client: DemoClient, twitch_id, name, seal, message):
"""
Here we use a 'stateless' approach to responding to a printer redeem,
generating a 'PLAIN' event from a twitch_id, name, message, and document seal.
This sends a single POST request to /events
This also returns the generated Event for use in e.g. stamping.
"""
# Generate a dummy Image with this message
card = generate_plain_card(message)
# Convert image to base64 string
image_payload = serialise_image(card)
# Send a POST request to /events with the event data
event = await client.create_plain_event(
message=message,
user={'twitch_id': twitch_id, 'name': name},
user_name=name,
occurred_at=datetime.now(),
document={'document_data': image_payload, 'seal': seal}
)
return event
async def subscription_event(client: DemoClient, twitch_id, name, seal, tier, length, message = None):
"""
Similar to printer_redeem, this uses a stateless approach to responding to a subscription event.
"""
card = generate_sub_card(name, tier, length, message)
image_payload = serialise_image(card)
event = await client.create_sub_event(
tier=tier,
subscribed_length=length,
message=message,
user={'twitch_id': twitch_id, 'name': name},
user_name=name,
occurred_at=datetime.now(),
document={'document_data': image_payload, 'seal': seal}
)
return event
async def cheer_event(client: DemoClient, twitch_id, name, seal, amount, message = None):
"""
Similar to printer_redeem, this uses a stateless approach to responding to a bit cheer event.
"""
card = generate_cheer_card(name, amount)
image_payload = serialise_image(card)
event = await client.create_cheer_event(
amount=amount,
message=message,
user={'twitch_id': twitch_id, 'name': name},
user_name=name,
occurred_at=datetime.now(),
document={'document_data': image_payload, 'seal': seal}
)
return event
async def raid_event(client: DemoClient, twitch_id, name, seal, visitors):
"""
Similar to printer_redeem, this uses a stateless approach to responding to a bit cheer event.
"""
card = generate_raid_card(name, visitors)
image_payload = serialise_image(card)
event = await client.create_raid_event(
viewer_count=visitors,
user={'twitch_id': twitch_id, 'name': name},
user_name=name,
occurred_at=datetime.now(),
document={'document_data': image_payload, 'seal': seal}
)
return event
async def add_stamp(client: DemoClient, event, stamp, pos_x, pos_y, rotation):
"""
Here we accept an Event payload, and add a stamp at the given position.
This sends a single PATCH request to /document/{document_id}
and updates both the document data and stamps at once.
Caution: This has the downside that it *overwrites* the existing stamps on the document,
using the stamps in event['document']['stamps'], plus the additional stamp.
This also fetches the updated 'event' and returns it.
"""
image_data = event['document']['document_data']
image = load_image(image_data)
image = stamp_card(image, stamp, pos_x, pos_y, rotation)
image_payload = serialise_image(image)
stamp = {'stamp_type': stamp, 'pos_x': pos_x, 'pos_y': pos_y, 'rotation': rotation}
await client.update_document(
document_id=event['document_id'],
document_data=image_payload,
stamps=event['document']['stamps'] + [stamp],
)
event = await client.get_event(event['event_id'])
return event
async def save_user_documents(client: DemoClient, twitch_id: str):
"""
This saves all the user event documents for the user with the given twitch_id in a folder.
"""
# First fetch the user with this id
users = await client.get_users(twitch_id=twitch_id)
if not users:
# No users exist with this twitchid, finish early
return
user = users[0]
# Get all the events for this user
events = await client.get_user_events(user['user_id'])
# Create a folder for the user if it doesn't exist
folder = f"users/{user['name']}/documents/"
if not os.path.exists(folder):
os.makedirs(folder)
# Go through each event and save the document
for event in events:
# Extract the document data
# Note that we cannot assume the event *has* a document
data = event.get('document', {}).get('document_data')
if data:
try:
name = event['event_type'] + '_' + str(event['event_id'])
image = load_image(data)
image.save(folder + name + '.png')
except Exception:
pass
# -------
# Demo Flows
# -------
async def printer_demo(client: DemoClient):
event = await printer_redeem(client, user1['twitch_id'], user1['name'], randint(0, 1000), "HAMBURGERS!")
event = await add_stamp(client, event, 'APPROVE', 64, 0, 45)
event = await add_stamp(client, event, 'DENY', 180, 0, -30)
async def raid_demo(client: DemoClient):
event = await raid_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), 314)
event = await add_stamp(client, event, 'APPROVE', 150, 0, 20)
async def sub_demo(client: DemoClient):
event = await subscription_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), tier=4, length=10, message="This Is Hamburgers!")
event = await add_stamp(client, event, 'APPROVE', 150, 10, 20)
async def cheer_demo(client: DemoClient):
event = await cheer_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), amount=1024, message="Blorple")
event = await add_stamp(client, event, 'DENY', 150, 10, -20)
async def wipe_users(client: DemoClient):
users = await client.get_users()
for user in users:
await client.delete_user(user['user_id'])
async def save_demo(client: DemoClient):
await save_user_documents(client, user1['twitch_id'])
async def run_demos(client: DemoClient):
await wipe_users(client)
await printer_demo(client)
await cheer_demo(client)
await raid_demo(client)
await sub_demo(client)
await save_demo(client)

70
src/drawing.py Normal file
View File

@@ -0,0 +1,70 @@
from io import BytesIO
import base64
from typing import Optional
from PIL import ImageDraw, Image
__all__ = [
'generate_plain_card',
'generate_sub_card',
'generate_cheer_card',
'generate_raid_card',
'stamp_card',
'serialise_image',
'load_image',
]
def generate_plain_card(message: str):
card = Image.new('RGBA', (256, 64), color="#418141")
draw = ImageDraw.Draw(card)
draw.text((2, 2), 'PLAIN CARD', font_size=15)
draw.text((32, 32), message,)
return card
def generate_sub_card(name: str, tier: int, length: int, message: Optional[str] = None):
card = Image.new('RGBA', (256, 128), color="#418141")
draw = ImageDraw.Draw(card)
draw.text((2, 2), 'SUBSCRIBER CARD', font_size=15)
draw.text((32, 32), f"Name: {name}")
draw.text((32, 48), f"Tier: {tier}")
draw.text((32, 64), f"Length: {length}")
if message:
draw.text((32, 80), f"Message: {message}")
return card
def generate_cheer_card(name: str, amount: int):
card = Image.new('RGBA', (256, 64), color="#418141")
draw = ImageDraw.Draw(card)
draw.text((2, 2), 'CHEER CARD', font_size=15)
draw.text((32, 32), f"Name: {name}")
draw.text((32, 48), f"Amount: {amount}")
return card
def generate_raid_card(name: str, visitors: int):
card = Image.new('RGBA', (256, 64), color="#418141")
draw = ImageDraw.Draw(card)
draw.text((2, 2), 'RAID CARD', font_size=15)
draw.text((32, 32), f"Name: {name}")
draw.text((32, 48), f"Visitors: {visitors}")
return card
def stamp_card(card: Image.Image, stamp: str, pos_x, pos_y, rotation):
draw = ImageDraw.Draw(card)
x0, y0, x1, y1 = draw.textbbox((0, 0), text=stamp, font_size=20)
canvas = Image.new('RGBA', (int(x1+1), int(y1+1)))
draw = ImageDraw.Draw(canvas)
draw.text((0, 0), text=stamp, fill='#F11111', stroke_width=1, font_size=20)
canvas = canvas.rotate(rotation, expand=True)
card.alpha_composite(canvas, (pos_x % card.width, pos_y % card.height))
return card
def serialise_image(image):
buffer = BytesIO()
image.save(buffer, 'PNG')
byts = base64.b64encode(buffer.getvalue())
return byts.decode('utf-8')
def load_image(imagestr: str):
byts = BytesIO(base64.b64decode(imagestr))
return Image.open(byts)