From 0d831d10f4847eebb13e30d8163248c6d0ab6db9 Mon Sep 17 00:00:00 2001 From: Interitio Date: Wed, 11 Jun 2025 19:42:22 +1000 Subject: [PATCH] Initial Demo client and flows. --- requirements.txt | 2 + src/client.py | 590 +++++++++++++++++++++++++++++++++++++++++++++++ src/demo.py | 40 ++++ src/demos.py | 202 ++++++++++++++++ src/drawing.py | 70 ++++++ 5 files changed, 904 insertions(+) create mode 100644 requirements.txt create mode 100644 src/client.py create mode 100644 src/demo.py create mode 100644 src/demos.py create mode 100644 src/drawing.py diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5b44629 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +aiohttp +pillow diff --git a/src/client.py b/src/client.py new file mode 100644 index 0000000..f1f20af --- /dev/null +++ b/src/client.py @@ -0,0 +1,590 @@ +import json +import logging +import asyncio +from datetime import datetime +from typing import NamedTuple, Optional + +from aiohttp import ClientSession + +logger = logging.getLogger(__name__) + + +class HTTPException(Exception): + def __init__(self, response, message): + self.response = response + self.stats = response.status + + error = "{} {}: {}".format(response.status, response.reason, message) + super().__init__(error) + + +class Route(NamedTuple): + path: str + method: str + params: dict[str, int|str] + url: str + + def url_for(self): + return self.url.format(**self.params) + + +class Router: + def __init__(self, base_url: str): + self.base_url = base_url + + def route(self, method, path, **params) -> Route: + url = self.base_url + path + route = Route( + method=method, + path=path, + params=params, + url=url, + ) + return route + + +class DemoClient: + user_agent = "Dreamspace Demo API client written by Interitio (cona@thewisewolf.dev)" + + def __init__(self, base_url: str, apikey: str, session: ClientSession): + self.router = Router(base_url) + self.apikey = apikey + + self.session = session + + async def request(self, route: Route, payload=None, query=None, **kwargs): + if self.session.closed: + raise ValueError("Session is closed or not started.") + + headers = { + "Content-Type": "application/json", + "Accept": "*/*", + "User-Agent": self.user_agent, + "X-API-KEY": self.apikey, + } + data = json.dumps(payload) if payload is not None else None + params = query + + logger.info(f"Sending {route.method} to '{route.url_for()}' with {params=} and {data=}") + async with self.session.request(route.method, route.url_for(), headers=headers, data=data, params=params) as resp: + text = await resp.text(encoding='utf-8') + logger.debug(f"{route.url} response {resp.status}: {text}") + + if 300 > resp.status >= 200: + return json.loads(text) + else: + raise HTTPException(resp, text) + + # -------------------- + # Users Chapter + # -------------------- + async def get_users( + self, + user_id: Optional[int]=None, + twitch_id: Optional[str]=None, + name: Optional[str]=None, + created_after: Optional[datetime]=None, + created_before: Optional[datetime]=None, + ): + query = { + 'user_id': str(user_id) if user_id else None, + 'twitch_id': twitch_id, + 'name': name, + 'created_after': created_after.isoformat() if created_after else None, + 'created_before': created_before.isoformat() if created_before else None, + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/users') + return await self.request(route, query=query) + + async def create_user( + self, + twitch_id: str, + name: str, + preferences: Optional[str] = None + ): + payload = {'twitch_id': twitch_id, 'name': name} + if preferences is not None: + payload['preferences'] = preferences + + route = self.router.route('POST', '/users') + return await self.request(route, payload=payload) + + async def get_user(self, user_id: int): + route = self.router.route('GET', '/users/{user_id}', user_id=user_id) + return await self.request(route) + + async def update_user(self, user_id: int, name: Optional[str] = None, preferences: Optional[str] = None): + payload = {} + if name is not None: + payload['name'] = name + if preferences is not None: + payload['preferences'] = preferences + + route = self.router.route('PATCH', '/users/{user_id}', user_id=user_id) + return await self.request(route) + + async def delete_user(self, user_id: int): + route = self.router.route('DELETE', '/users/{user_id}', user_id=user_id) + return await self.request(route) + + async def get_user_events(self, user_id: int): + # This supports the same filters as get_events, but we don't implement those here + # We could also implement create_user_event with subtypes, but we don't + route = self.router.route('GET', '/users/{user_id}/events', user_id=user_id) + return await self.request(route) + + async def get_user_specimen(self, user_id: int): + route = self.router.route('GET', '/users/{user_id}/specimen', user_id=user_id) + return await self.request(route) + + async def create_user_specimen( + self, user_id: int, + born_at: Optional[datetime] = None, forgotten_at: Optional[datetime] = None + ): + payload = {} + if born_at is not None: + payload['born_at'] = born_at.isoformat() + if forgotten_at is not None: + payload['forgotten_at'] = forgotten_at.isoformat() + + route = self.router.route('POST', '/users/{user_id}/specimen', user_id=user_id) + return await self.request(route, payload=payload) + + async def get_user_transactions(self, user_id: int): + route = self.router.route('GET', '/users/{user_id}/transactions', user_id=user_id) + return await self.request(route) + + # -------------------- + # Specimens Chapter + # -------------------- + async def get_specimens( + self, + specimen_id: Optional[int] = None, + owner_id: Optional[int] = None, + born_after: Optional[datetime]=None, + born_before: Optional[datetime]=None, + forgotten: Optional[bool]=None, + forgotten_after: Optional[datetime]=None, + forgotten_before: Optional[datetime]=None, + ): + query = { + 'specimen_id': str(specimen_id) if specimen_id is not None else None, + 'owner_id': str(owner_id) if owner_id is not None else None, + 'born_after': born_after.isoformat() if born_after else None, + 'born_before': born_before.isoformat() if born_before else None, + 'forgotten': ('true' if forgotten else 'false') if forgotten is not None else None, + 'forgotten_after': forgotten_after.isoformat() if forgotten_after else None, + 'forgotten_before': forgotten_before.isoformat() if forgotten_before else None, + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/specimens') + return await self.request(route, query=query) + + async def create_specimen( + self, + owner_id: int, + born_at: Optional[datetime] = None, + forgotten_at: Optional[datetime] = None, + ): + payload = {'owner_id': owner_id} + if born_at is not None: + payload['born_at'] = born_at.isoformat() + if forgotten_at is not None: + payload['forgotten_at'] = forgotten_at.isoformat() + + route = self.router.route('POST', '/specimens') + return await self.request(route, payload=payload) + + async def create_specimen_stateless( + self, + owner: dict, + born_at: Optional[datetime] = None, + forgotten_at: Optional[datetime] = None, + ): + payload = {'owner': owner} + if born_at is not None: + payload['born_at'] = born_at.isoformat() + if forgotten_at is not None: + payload['forgotten_at'] = forgotten_at.isoformat() + + route = self.router.route('POST', '/specimens') + return await self.request(route, payload=payload) + + async def get_specimen(self, specimen_id: int): + route = self.router.route('GET', '/specimens/{specimen_id}', specimen_id=specimen_id) + return await self.request(route) + + async def update_specimen( + self, + specimen_id: int, + owner_id: Optional[int] = None, + forgotten_at: Optional[datetime] = None + ): + payload = {} + if owner_id is not None: + payload['owner_id'] = owner_id + if forgotten_at is not None: + payload['forgotten_at'] = forgotten_at.isoformat() + + route = self.router.route('PATCH', '/specimens/{specimen_id}', specimen_id=specimen_id) + return await self.request(route) + + async def forget_specimen(self, specimen_id: int): + await self.update_specimen(specimen_id, forgotten_at=datetime.now()) + + async def delete_specimen(self, specimen_id: int): + route = self.router.route('DELETE', '/specimens/{specimen_id}', specimen_id=specimen_id) + return await self.request(route) + + async def get_specimen_owner(self, specimen_id: int): + route = self.router.route('GET', '/specimens/{specimen_id}/owner', specimen_id=specimen_id) + return await self.request(route) + + # -------------------- + # Documents Chapter + # -------------------- + async def get_documents( + self, + document_id: Optional[int] = None, + seal: Optional[int] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, + metadata: Optional[str] = None, + stamp_type: Optional[str] = None, + ): + query = { + 'document_id': str(document_id) if document_id is not None else None, + 'seal': str(seal) if seal is not None else None, + 'created_after': created_after.isoformat() if created_after else None, + 'created_before': created_before.isoformat() if created_before else None, + 'metadata': metadata, + 'stamp_type': stamp_type, + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/documents') + return await self.request(route, query=query) + + async def create_document( + self, + document_data: str, + seal: int, + metadata: Optional[str] = None, + stamps: list[dict] = [], + ): + payload = { + 'document_data': document_data, + 'seal': seal, + 'metadata': metadata, + 'stamps': stamps, + } + route = self.router.route('POST', '/documents') + return await self.request(route, payload=payload) + + async def get_document(self, document_id: int): + route = self.router.route('GET', '/documents/{document_id}', document_id=document_id) + return await self.request(route) + + async def update_document( + self, + document_id: int, + document_data: Optional[str] = None, + seal: Optional[int] = None, + metadata: Optional[str] = None, + stamps: list[dict] = [], + ): + payload = { + 'document_data': document_data, + 'seal': str(seal) if seal is not None else None, + 'metadata': metadata, + 'stamps': stamps, + } + payload = {k: v for k, v in payload.items() if v} + route = self.router.route('PATCH', '/documents/{document_id}', document_id=document_id) + return await self.request(route, payload=payload) + + async def delete_document(self, document_id: int): + route = self.router.route('DELETE', '/documents/{document_id}', document_id=document_id) + return await self.request(route) + + async def get_document_stamps(self, document_id: int): + route = self.router.route('GET', '/documents/{document_id}/stamps', document_id=document_id) + return await self.request(route) + + async def post_document_stamps(self, document_id: int, **stamp_args): + route = self.router.route('POST', '/documents/{document_id}/stamps', document_id=document_id) + return await self.request(route, payload=stamp_args) + + async def put_document_stamps(self, document_id: int, *stamp_args): + route = self.router.route('PUT', '/documents/{document_id}/stamps', document_id=document_id) + return await self.request(route, payload=stamp_args) + + # -------------------- + # Stamps Chapter + # -------------------- + async def get_stamps( + self, + stamp_id: Optional[int] = None, + document_id: Optional[int] = None, + stamp_type: Optional[str] = None, + ): + query = { + 'stamp_id': str(stamp_id) if stamp_id is not None else None, + 'document_id': str(document_id) if document_id is not None else None, + 'stamp_type': stamp_type + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/stamps') + return await self.request(route, query=query) + + async def create_stamp( + self, + document_id: int, + stamp_type: str, + pos_x: int, + pos_y: int, + rotation: float, + ): + payload = { + 'document_id': document_id, + 'stamp_type': stamp_type, + 'pos_x': pos_x, + 'pos_y': pos_y, + 'rotation': rotation, + } + route = self.router.route('POST', '/stamps') + return await self.request(route, payload=payload) + + async def create_stamps(self, *stamp_args): + route = self.router.route('PUT', '/stamps') + return await self.request(route, payload=stamp_args) + + async def get_stamp(self, stamp_id: int): + route = self.router.route('GET', '/stamps/{stamp_id}', stamp_id=stamp_id) + return await self.request(route) + + async def update_stamp( + self, + stamp_id: int, + document_id: Optional[int] = None, + stamp_type: Optional[str] = None, + pos_x: Optional[int] = None, + pos_y: Optional[int] = None, + rotation: Optional[float] = None, + ): + payload = { + 'document_id': document_id, + 'stamp_type': stamp_type, + 'pos_x': pos_x, + 'pos_y': pos_y, + 'rotation': rotation, + } + payload = {k: v for k, v in payload.items() if v is not None} + route = self.router.route('PATCH', '/stamps/{stamp_id}', stamp_id=stamp_id) + return await self.request(route, payload=payload) + + async def delete_stamp(self, stamp_id: int): + route = self.router.route('DELETE', '/stamps/{stamp_id}', stamp_id=stamp_id) + return await self.request(route) + + # -------------------- + # Transactions Chapter + # -------------------- + async def get_transactions( + self, + transaction_id: Optional[int] = None, + user_id: Optional[int] = None, + description: Optional[str] = None, + reference: Optional[str] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, + ): + query = { + 'transaction_id': int(transaction_id) if transaction_id else None, + 'user_id': int(user_id) if user_id else None, + 'description': description, + 'reference': reference, + 'created_after': created_after.isoformat() if created_after else None, + 'created_before': created_before.isoformat() if created_before else None, + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/transactions') + return await self.request(route, query=query) + + async def create_transaction( + self, + user_id: int, + amount: int, + description: str, + reference: Optional[str] = None, + ): + payload = { + 'user_id': user_id, + 'amount': amount, + 'description': description, + } + if reference is not None: + payload['reference'] = reference + + route = self.router.route('POST', '/transactions') + return await self.request(route, payload=payload) + + async def get_transaction(self, transaction_id: int): + route = self.router.route('GET', '/transactions/{transaction_id}', transaction_id=transaction_id) + return await self.request(route) + + async def get_transaction_user(self, transaction_id: int): + route = self.router.route('GET', '/transactions/{transaction_id}/user', transaction_id=transaction_id) + return await self.request(route) + + # -------------------- + # Events Chapter + # -------------------- + async def get_events( + self, + event_id: Optional[int] = None, + document_id: Optional[int] = None, + document_seal: Optional[int] = None, + user_id: Optional[int] = None, + user_name: Optional[str] = None, + occurred_before: Optional[datetime] = None, + occurred_after: Optional[datetime] = None, + created_before: Optional[datetime] = None, + created_after: Optional[datetime] = None, + event_type: Optional[str] = None, + ): + query = { + 'event_id': int(event_id) if event_id else None, + 'document_id': int(document_id) if document_id else None, + 'document_seal': int(document_seal) if document_seal else None, + 'user_id': int(user_id) if user_id else None, + 'user_name': user_name, + 'event_type': event_type, + 'created_after': created_after.isoformat() if created_after else None, + 'created_before': created_before.isoformat() if created_before else None, + 'occurred_after': occurred_after.isoformat() if occurred_after else None, + 'occurred_before': occurred_before.isoformat() if occurred_before else None, + } + query = {k: v for k, v in query.items() if v is not None} + + route = self.router.route('GET', '/events') + return await self.request(route, query=query) + + async def _create_event( + self, + **create_args + ): + route = self.router.route('POST', '/events') + return await self.request(route, payload=create_args) + + async def create_event( + self, + user_name: str, + occurred_at: datetime, + event_type: str, + document_id: Optional[int] = None, + document: Optional[dict] = None, + user_id: Optional[int] = None, + user: Optional[dict] = None, + **kwargs + ): + if document_id is not None and document is not None: + raise ValueError("Cannot provide both document and document_id.") + if user_id is not None and user is not None: + raise ValueError("Cannot provide both user and user_id.") + if user_id is None and user is None: + raise ValueError("Must provide exactly one of user or user_id.") + + args = { + 'user_name': user_name, + 'occurred_at': occurred_at.isoformat(), + 'event_type': event_type, + 'document_id': document_id, + 'document': document, + 'user_id': user_id, + 'user': user, + } | kwargs + args = {k: v for k, v in args.items() if v is not None} + return await self._create_event(**args) + + async def create_plain_event( + self, *, + message: str, + **kwargs + ): + return await self.create_event( + event_type='plain', + message=message, + **kwargs + ) + + async def create_sub_event( + self, *, + tier: int, + subscribed_length: int, + message: Optional[str] = None, + **kwargs + ): + return await self.create_event( + event_type='subscriber', + tier=tier, + subscribed_length=subscribed_length, + message=message, + **kwargs + ) + + async def create_cheer_event( + self, *, + amount: int, + cheer_type: Optional[str] = None, + message: Optional[str] = None, + **kwargs + ): + return await self.create_event( + event_type='cheer', + amount=amount, + cheer_type=cheer_type, + message=message, + **kwargs + ) + + async def create_raid_event( + self, *, + viewer_count: int, + **kwargs + ): + return await self.create_event( + event_type='raid', + viewer_count=viewer_count, + **kwargs + ) + + async def get_event(self, event_id: int): + route = self.router.route('GET', '/events/{event_id}', event_id=event_id) + return await self.request(route) + + async def update_event( + self, event_id: int, **kwargs + ): + # Haven't bothered to add the arguments explicitly here + route = self.router.route('PATCH', '/events/{event_id}', event_id=event_id) + return await self.request(route, payload=kwargs) + + async def delete_event(self, event_id: int): + route = self.router.route('DELETE', '/events/{event_id}', event_id=event_id) + return await self.request(route) + + async def get_event_document(self, event_id: int): + route = self.router.route('GET', '/events/{event_id}/document', event_id=event_id) + return await self.request(route) + + async def get_event_user(self, event_id: int): + route = self.router.route('GET', '/events/{event_id}/user', event_id=event_id) + return await self.request(route) + + diff --git a/src/demo.py b/src/demo.py new file mode 100644 index 0000000..0ae3c3c --- /dev/null +++ b/src/demo.py @@ -0,0 +1,40 @@ +import asyncio +import sys +import argparse +import logging +from typing import Optional + +import aiohttp + +from client import DemoClient +from demos import run_demos + +parser = argparse.ArgumentParser() +parser.add_argument( + '--url', + dest='base_url', + default="https://dreams.thewisewolf.dev", + help="Base API URL to use." +) +parser.add_argument( + '--token', + dest='token', + help="API Token to use to authenticate with Dreamspace API" +) + +logger = logging.getLogger() +logger.addHandler(logging.StreamHandler()) +logger.setLevel(logging.DEBUG) + + + + +async def main(): + # Read url and auth token from args + args = parser.parse_args() + async with aiohttp.ClientSession() as session: + client = DemoClient(args.base_url, args.token, session=session) + await run_demos(client) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/demos.py b/src/demos.py new file mode 100644 index 0000000..ba630ad --- /dev/null +++ b/src/demos.py @@ -0,0 +1,202 @@ +from datetime import datetime +import os +from random import randint +from client import DemoClient +from drawing import generate_sub_card, generate_plain_card, generate_cheer_card, generate_raid_card, stamp_card, load_image, serialise_image + + +user1 = { + "twitch_id": "105474571", + "name": "alllenski", + "preferences": "Hamburgers", +} + +user2 = { + "twitch_id": "623539907", + "name": "holonomy", + "preferences": "cats", +} + + +async def spawn_specimen(client: DemoClient): + # Spawn a specimen + ... + + +async def printer_redeem(client: DemoClient, twitch_id, name, seal, message): + """ + Here we use a 'stateless' approach to responding to a printer redeem, + generating a 'PLAIN' event from a twitch_id, name, message, and document seal. + + This sends a single POST request to /events + + This also returns the generated Event for use in e.g. stamping. + """ + # Generate a dummy Image with this message + card = generate_plain_card(message) + + # Convert image to base64 string + image_payload = serialise_image(card) + + # Send a POST request to /events with the event data + event = await client.create_plain_event( + message=message, + user={'twitch_id': twitch_id, 'name': name}, + user_name=name, + occurred_at=datetime.now(), + document={'document_data': image_payload, 'seal': seal} + ) + return event + + +async def subscription_event(client: DemoClient, twitch_id, name, seal, tier, length, message = None): + """ + Similar to printer_redeem, this uses a stateless approach to responding to a subscription event. + """ + card = generate_sub_card(name, tier, length, message) + image_payload = serialise_image(card) + event = await client.create_sub_event( + tier=tier, + subscribed_length=length, + message=message, + user={'twitch_id': twitch_id, 'name': name}, + user_name=name, + occurred_at=datetime.now(), + document={'document_data': image_payload, 'seal': seal} + ) + return event + + +async def cheer_event(client: DemoClient, twitch_id, name, seal, amount, message = None): + """ + Similar to printer_redeem, this uses a stateless approach to responding to a bit cheer event. + """ + card = generate_cheer_card(name, amount) + image_payload = serialise_image(card) + event = await client.create_cheer_event( + amount=amount, + message=message, + user={'twitch_id': twitch_id, 'name': name}, + user_name=name, + occurred_at=datetime.now(), + document={'document_data': image_payload, 'seal': seal} + ) + return event + + +async def raid_event(client: DemoClient, twitch_id, name, seal, visitors): + """ + Similar to printer_redeem, this uses a stateless approach to responding to a bit cheer event. + """ + card = generate_raid_card(name, visitors) + image_payload = serialise_image(card) + event = await client.create_raid_event( + viewer_count=visitors, + user={'twitch_id': twitch_id, 'name': name}, + user_name=name, + occurred_at=datetime.now(), + document={'document_data': image_payload, 'seal': seal} + ) + return event + + +async def add_stamp(client: DemoClient, event, stamp, pos_x, pos_y, rotation): + """ + Here we accept an Event payload, and add a stamp at the given position. + + This sends a single PATCH request to /document/{document_id} + and updates both the document data and stamps at once. + + Caution: This has the downside that it *overwrites* the existing stamps on the document, + using the stamps in event['document']['stamps'], plus the additional stamp. + + This also fetches the updated 'event' and returns it. + """ + image_data = event['document']['document_data'] + image = load_image(image_data) + + image = stamp_card(image, stamp, pos_x, pos_y, rotation) + image_payload = serialise_image(image) + + stamp = {'stamp_type': stamp, 'pos_x': pos_x, 'pos_y': pos_y, 'rotation': rotation} + await client.update_document( + document_id=event['document_id'], + document_data=image_payload, + stamps=event['document']['stamps'] + [stamp], + ) + event = await client.get_event(event['event_id']) + return event + +async def save_user_documents(client: DemoClient, twitch_id: str): + """ + This saves all the user event documents for the user with the given twitch_id in a folder. + """ + # First fetch the user with this id + users = await client.get_users(twitch_id=twitch_id) + if not users: + # No users exist with this twitchid, finish early + return + user = users[0] + + # Get all the events for this user + events = await client.get_user_events(user['user_id']) + + # Create a folder for the user if it doesn't exist + folder = f"users/{user['name']}/documents/" + if not os.path.exists(folder): + os.makedirs(folder) + + # Go through each event and save the document + for event in events: + # Extract the document data + # Note that we cannot assume the event *has* a document + data = event.get('document', {}).get('document_data') + if data: + try: + name = event['event_type'] + '_' + str(event['event_id']) + image = load_image(data) + image.save(folder + name + '.png') + except Exception: + pass + + +# ------- +# Demo Flows +# ------- + +async def printer_demo(client: DemoClient): + event = await printer_redeem(client, user1['twitch_id'], user1['name'], randint(0, 1000), "HAMBURGERS!") + event = await add_stamp(client, event, 'APPROVE', 64, 0, 45) + event = await add_stamp(client, event, 'DENY', 180, 0, -30) + +async def raid_demo(client: DemoClient): + event = await raid_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), 314) + event = await add_stamp(client, event, 'APPROVE', 150, 0, 20) + +async def sub_demo(client: DemoClient): + event = await subscription_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), tier=4, length=10, message="This Is Hamburgers!") + event = await add_stamp(client, event, 'APPROVE', 150, 10, 20) + +async def cheer_demo(client: DemoClient): + event = await cheer_event(client, user1['twitch_id'], user1['name'], randint(0, 1000), amount=1024, message="Blorple") + event = await add_stamp(client, event, 'DENY', 150, 10, -20) + +async def wipe_users(client: DemoClient): + users = await client.get_users() + for user in users: + await client.delete_user(user['user_id']) + + +async def save_demo(client: DemoClient): + await save_user_documents(client, user1['twitch_id']) + + + +async def run_demos(client: DemoClient): + await wipe_users(client) + await printer_demo(client) + await cheer_demo(client) + await raid_demo(client) + await sub_demo(client) + await save_demo(client) + diff --git a/src/drawing.py b/src/drawing.py new file mode 100644 index 0000000..4175e5c --- /dev/null +++ b/src/drawing.py @@ -0,0 +1,70 @@ +from io import BytesIO +import base64 +from typing import Optional +from PIL import ImageDraw, Image + +__all__ = [ + 'generate_plain_card', + 'generate_sub_card', + 'generate_cheer_card', + 'generate_raid_card', + 'stamp_card', + 'serialise_image', + 'load_image', +] + + +def generate_plain_card(message: str): + card = Image.new('RGBA', (256, 64), color="#418141") + draw = ImageDraw.Draw(card) + draw.text((2, 2), 'PLAIN CARD', font_size=15) + draw.text((32, 32), message,) + return card + +def generate_sub_card(name: str, tier: int, length: int, message: Optional[str] = None): + card = Image.new('RGBA', (256, 128), color="#418141") + draw = ImageDraw.Draw(card) + draw.text((2, 2), 'SUBSCRIBER CARD', font_size=15) + draw.text((32, 32), f"Name: {name}") + draw.text((32, 48), f"Tier: {tier}") + draw.text((32, 64), f"Length: {length}") + if message: + draw.text((32, 80), f"Message: {message}") + return card + +def generate_cheer_card(name: str, amount: int): + card = Image.new('RGBA', (256, 64), color="#418141") + draw = ImageDraw.Draw(card) + draw.text((2, 2), 'CHEER CARD', font_size=15) + draw.text((32, 32), f"Name: {name}") + draw.text((32, 48), f"Amount: {amount}") + return card + +def generate_raid_card(name: str, visitors: int): + card = Image.new('RGBA', (256, 64), color="#418141") + draw = ImageDraw.Draw(card) + draw.text((2, 2), 'RAID CARD', font_size=15) + draw.text((32, 32), f"Name: {name}") + draw.text((32, 48), f"Visitors: {visitors}") + return card + +def stamp_card(card: Image.Image, stamp: str, pos_x, pos_y, rotation): + draw = ImageDraw.Draw(card) + x0, y0, x1, y1 = draw.textbbox((0, 0), text=stamp, font_size=20) + canvas = Image.new('RGBA', (int(x1+1), int(y1+1))) + draw = ImageDraw.Draw(canvas) + draw.text((0, 0), text=stamp, fill='#F11111', stroke_width=1, font_size=20) + canvas = canvas.rotate(rotation, expand=True) + + card.alpha_composite(canvas, (pos_x % card.width, pos_y % card.height)) + return card + +def serialise_image(image): + buffer = BytesIO() + image.save(buffer, 'PNG') + byts = base64.b64encode(buffer.getvalue()) + return byts.decode('utf-8') + +def load_image(imagestr: str): + byts = BytesIO(base64.b64decode(imagestr)) + return Image.open(byts)