From fd04b825f21ceeadffd7553c6f59f6fb6e39ebd4 Mon Sep 17 00:00:00 2001 From: Conatum Date: Thu, 3 Nov 2022 15:36:39 +0200 Subject: [PATCH] rewrite: Implement shard IPC server. --- bot/main.py | 9 +- bot/meta/LionBot.py | 4 + bot/meta/__init__.py | 2 + bot/meta/app.py | 33 +++++++ bot/meta/args.py | 34 +++++-- bot/meta/ipc/__init__.py | 0 bot/meta/ipc/client.py | 200 +++++++++++++++++++++++++++++++++++++++ bot/meta/ipc/server.py | 173 +++++++++++++++++++++++++++++++++ run.py | 2 +- run_server.py | 20 ++++ 10 files changed, 460 insertions(+), 17 deletions(-) create mode 100644 bot/meta/app.py create mode 100644 bot/meta/ipc/__init__.py create mode 100644 bot/meta/ipc/client.py create mode 100644 bot/meta/ipc/server.py create mode 100644 run_server.py diff --git a/bot/main.py b/bot/main.py index d3b0e561..309cb57e 100644 --- a/bot/main.py +++ b/bot/main.py @@ -4,7 +4,7 @@ import logging import discord from discord.ext import commands -from meta import LionBot, conf, sharding +from meta import LionBot, conf, sharding, appname, shard_talk from meta.logger import log_context, log_action from data import Database @@ -15,12 +15,6 @@ from constants import DATA_VERSION # Note: This MUST be imported after core, due to table definition orders # from settings import AppSettings - -# Load and attach app specific data -if sharding.sharded: - appname = f"{conf.data['appid']}_{sharding.shard_count}_{sharding.shard_number}" -else: - appname = conf.data['appid'] log_context.set(f"APP: {appname}") # client.appdata = core.data.meta.fetch_or_create(appname) @@ -62,6 +56,7 @@ async def main(): config=conf, initial_extensions=['modules'], web_client=None, + app_ipc=shard_talk, testing_guilds=[889875661848723456] ) as lionbot: log_action.set("Launching") diff --git a/bot/meta/LionBot.py b/bot/meta/LionBot.py index 3bcbd603..30d4d3dc 100644 --- a/bot/meta/LionBot.py +++ b/bot/meta/LionBot.py @@ -18,6 +18,7 @@ class LionBot(commands.Bot): config: Conf, initial_extensions: List[str], web_client: ClientSession, + app_ipc, testing_guilds: List[int] = [], **kwargs, ): @@ -29,8 +30,11 @@ class LionBot(commands.Bot): self.appname = appname # self.appdata = appdata self.config = config + self.app_ipc = app_ipc async def setup_hook(self) -> None: + await self.app_ipc.connect() + for extension in self.initial_extensions: await self.load_extension(extension) diff --git a/bot/meta/__init__.py b/bot/meta/__init__.py index 0dcd1f57..fa9d999b 100644 --- a/bot/meta/__init__.py +++ b/bot/meta/__init__.py @@ -1,5 +1,7 @@ from .LionBot import LionBot from .config import conf from .args import args +from .app import appname, shard_talk from . import sharding from . import logger +from . import app diff --git a/bot/meta/app.py b/bot/meta/app.py new file mode 100644 index 00000000..42335703 --- /dev/null +++ b/bot/meta/app.py @@ -0,0 +1,33 @@ +from . import sharding, conf +from .logger import log_app +from .ipc.client import AppClient +from .args import args + + +def appname_from_shard(shardid): + appname = f"{conf.data['appid']}_{sharding.shard_count:02}_{shardid:02}" + return appname + + +def shard_from_appname(appname: str): + return int(appname.rsplit('_', maxsplit=1)[-1]) + + +if sharding.sharded: + appname = appname_from_shard(sharding.shard_number) +else: + appname = conf.data['appid'] + +log_app.set(appname) + + +shard_talk = AppClient( + appname, + {'host': args.host, 'port': args.port}, + {'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])} +) + + +@shard_talk.register_route() +async def ping(): + return "Pong!" diff --git a/bot/meta/args.py b/bot/meta/args.py index c2dd70d6..8d82a693 100644 --- a/bot/meta/args.py +++ b/bot/meta/args.py @@ -6,14 +6,30 @@ from constants import CONFIG_FILE # Parsed commandline arguments # ------------------------------ parser = argparse.ArgumentParser() -parser.add_argument('--conf', - dest='config', - default=CONFIG_FILE, - help="Path to configuration file.") -parser.add_argument('--shard', - dest='shard', - default=None, - type=int, - help="Shard number to run, if applicable.") +parser.add_argument( + '--conf', + dest='config', + default=CONFIG_FILE, + help="Path to configuration file." +) +parser.add_argument( + '--shard', + dest='shard', + default=None, + type=int, + help="Shard number to run, if applicable." +) +parser.add_argument( + '--host', + dest='host', + default='127.0.0.1', + help="IP address to run the app listener on." +) +parser.add_argument( + '--port', + dest='port', + default='5001', + help="Port to run the app listener on." +) args = parser.parse_args() diff --git a/bot/meta/ipc/__init__.py b/bot/meta/ipc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bot/meta/ipc/client.py b/bot/meta/ipc/client.py new file mode 100644 index 00000000..d9127400 --- /dev/null +++ b/bot/meta/ipc/client.py @@ -0,0 +1,200 @@ +from typing import Optional +import asyncio +import logging +import pickle + + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +class AppClient: + routes = {} # route_name -> Callable[Any, Awaitable[Any]] + + def __init__(self, appid, client_address, server_address): + self.appid = appid + self.address = client_address + self.server_address = server_address + + self.peers = {appid: client_address} # appid -> address + + self._listener: Optional[asyncio.Server] = None # Local client server + self._server = None # Connection to the registry server + + self.register_route('new_peer')(self.new_peer) + self.register_route('drop_peer')(self.drop_peer) + self.register_route('peer_list')(self.peer_list) + + def register_route(self, name=None): + def wrapper(coro): + route = AppRoute(coro, name) + self.routes[route.name] = route + return route + return wrapper + + async def server_connection(self): + """Establish a connection to the registry server""" + try: + reader, writer = await asyncio.open_connection(**self.server_address) + + payload = ('connect', (), {'appid': self.appid, 'address': self.address}) + writer.write(pickle.dumps(payload)) + writer.write(b'\n') + await writer.drain() + + data = await reader.readline() + peers = pickle.loads(data) + self.peers = peers + self._server = (reader, writer) + except Exception: + logger.exception("Could not connect to registry server. Trying again in 30 seconds.") + await asyncio.sleep(30) + asyncio.create_task(self.server_connection()) + else: + logger.info("Connected to the registry server, launching keepalive.") + asyncio.create_task(self._server_keepalive()) + + async def _server_keepalive(self): + if self._server is None: + raise ValueError("Cannot keepalive non-existent server!") + reader, write = self._server + try: + await reader.read() + except Exception: + logger.exception("Lost connection to address server. Reconnecting...") + else: + # Connection ended or broke + logger.info("Lost connection to address server. Reconnecting...") + await asyncio.sleep(30) + asyncio.create_task(self.server_connection()) + + async def new_peer(self, appid, address): + self.peers[appid] = address + + async def peer_list(self, peers): + self.peers = peers + + async def drop_peer(self, appid): + self.peers.pop(appid, None) + + async def close(self): + # Close connection to the server + # TODO + ... + + async def request(self, appid, payload: 'AppPayload'): + try: + if appid not in self.peers: + raise ValueError(f"Peer '{appid}' not found.") + logger.debug(f"Sending request to app '{appid}' with payload {payload}") + + address = self.peers[appid] + reader, writer = await asyncio.open_connection(**address) + + writer.write(payload.encoded()) + await writer.drain() + writer.write_eof() + result = await reader.read() + writer.close() + decoded = payload.route.decode(result) + return decoded + except Exception: + logging.exception(f"Failed to send request to {appid}'") + return None + + async def requestall(self, payload): + results = await asyncio.gather(*(self.request(appid, payload) for appid in self.peers)) + return dict(zip(self.peers.keys(), results)) + + async def handle_request(self, reader, writer): + data = await reader.read() + loaded = pickle.loads(data) + route, args, kwargs = loaded + + logger.debug(f"AppClient {self.appid} handling request on route '{route}' with args {args} and kwargs {kwargs}") + + if route in self.routes: + try: + await self.routes[route].run((reader, writer), args, kwargs) + except Exception: + logger.exception(f"Fatal exception during route '{route}'. This should never happen!") + else: + logger.warning(f"Appclient '{self.appid}' recieved unknown route {route}. Ignoring.") + writer.write_eof() + + async def connect(self): + """ + Start the local peer server. + Connect to the address server. + """ + # Start the client server + self._listener = await asyncio.start_server(self.handle_request, **self.address, start_serving=True) + + logger.info(f"Serving on {self.address}") + await self.server_connection() + + +class AppPayload: + __slots__ = ('route', 'args', 'kwargs') + + def __init__(self, route, *args, **kwargs): + self.route = route + self.args = args + self.kwargs = kwargs + + def __await__(self): + return self.route.execute(*self.args, **self.kwargs).__await__() + + def encoded(self): + return pickle.dumps((self.route.name, self.args, self.kwargs)) + + +class AppRoute: + __slots__ = ('func', 'name') + + def __init__(self, func, name=None): + self.func = func + self.name = name or func.__name__ + + def __call__(self, *args, **kwargs): + return AppPayload(self, *args, **kwargs) + + def encode(self, output): + return pickle.dumps(output) + + def decode(self, encoded): + # TODO: Handle exceptions here somehow + if len(encoded) > 0: + return pickle.loads(encoded) + else: + return '' + + def encoder(self, func): + self.encode = func + + def decoder(self, func): + self.decode = func + + async def execute(self, *args, **kwargs): + """ + Execute the underlying function, with the given arguments. + """ + return await self.func(*args, **kwargs) + + async def run(self, connection, args, kwargs): + """ + Run the route, with the given arguments, using the given connection. + """ + # TODO: ContextVar here for logging? Or in handle_request? + # Get encoded result + # TODO: handle exceptions in the execution process + try: + result = await self.execute(*args, **kwargs) + payload = self.encode(result) + except Exception: + logger.exception(f"Exception occured running route '{self.name}' with args: {args} and kwargs: {kwargs}") + payload = b'' + _, writer = connection + writer.write(payload) + await writer.drain() + writer.close() diff --git a/bot/meta/ipc/server.py b/bot/meta/ipc/server.py new file mode 100644 index 00000000..6d9545f4 --- /dev/null +++ b/bot/meta/ipc/server.py @@ -0,0 +1,173 @@ +import asyncio +import pickle +import logging +import string +import random + +from ..logger import log_action, log_context, log_app + +logger = logging.getLogger(__name__) + + +uuid_alphabet = string.ascii_lowercase + string.digits + + +def short_uuid(): + return ''.join(random.choices(uuid_alphabet, k=10)) + + +class AppServer: + routes = {} # route name -> bound method + + def __init__(self): + self.clients = {} # AppID -> (info, connection) + + self.route('ping')(self.route_ping) + self.route('whereis')(self.route_whereis) + self.route('peers')(self.route_peers) + self.route('connect')(self.client_connection) + + @classmethod + def route(cls, route_name): + """ + Decorator to add a route to the server. + """ + def wrapper(coro): + cls.routes[route_name] = coro + return coro + return wrapper + + async def route_ping(self, connection): + """ + Pong. + """ + reader, writer = connection + writer.write(b"Pong") + writer.write_eof() + + async def route_whereis(self, connection, appid): + """ + Return an address for the given client appid. + Returns None if the client does not have a connection. + """ + reader, writer = connection + if appid in self.clients: + writer.write(pickle.dumps(self.clients[appid][0])) + else: + writer.write(b'') + writer.write_eof() + + async def route_peers(self, connection): + """ + Send back a map of current peers. + """ + reader, writer = connection + peers = self.peer_list() + payload = pickle.dumps(('peer_list', (peers,))) + writer.write(payload) + writer.write_eof() + + async def client_connection(self, connection, appid, address): + """ + Register and hold a new client connection. + """ + log_action.set("CONN " + appid) + reader, writer = connection + # Add the new client + self.clients[appid] = (address, connection) + + # Send the new client a client list + peers = self.peer_list() + writer.write(pickle.dumps(peers)) + writer.write(b'\n') + await writer.drain() + + # Announce the new client to everyone + await self.broadcast('new_peer', (), {'appid': appid, 'address': address}) + + # Keep the connection open until socket closed or EOF (indicating client death) + try: + await reader.read() + finally: + # Connection ended or it broke + logger.info(f"Lost client '{appid}'") + await self.deregister_client(appid) + + async def handle_connection(self, reader, writer): + data = await reader.readline() + route, args, kwargs = pickle.loads(data) + + rqid = short_uuid() + log_context.set("RQID:" + rqid) + log_action.set("SERV ROUTE " + route) + + logger.info(f"AppServer handling request on route '{route}' with args {args} and kwargs {kwargs}") + + if route in self.routes: + # Execute route + try: + await self.routes[route]((reader, writer), *args, **kwargs) + except Exception: + logger.exception(f"AppServer recieved exception during route '{route}'") + else: + logger.warning(f"AppServer recieved unknown route '{route}'. Ignoring.") + + def peer_list(self): + return {appid: address for appid, (address, _) in self.clients.items()} + + async def deregister_client(self, appid): + self.clients.pop(appid, None) + await self.broadcast('drop_peer', (), {'appid': appid}) + + async def broadcast(self, route, args, kwargs): + logger.debug(f"Sending broadcast on route '{route}' with args {args} and kwargs {kwargs}.") + payload = pickle.dumps((route, args, kwargs)) + if self.clients: + await asyncio.gather( + *(self._send(appid, payload) for appid in self.clients), + return_exceptions=True + ) + + async def message_client(self, appid, route, args, kwargs): + """ + Send a message to client `appid` along `route` with given arguments. + """ + logger.debug(f"Sending '{route}' to '{appid}' with args {args} and kwargs {kwargs}.") + if appid not in self.clients: + raise ValueError(f"Client '{appid}' is not connected.") + + payload = pickle.dumps((route, args, kwargs)) + return await self._send(appid, payload) + + async def _send(self, appid, payload): + """ + Send the encoded `payload` to the client `appid`. + """ + address, _ = self.clients[appid] + try: + reader, writer = await asyncio.open_connection(**address) + writer.write(payload) + writer.write_eof() + await writer.drain() + writer.close() + except Exception as ex: + # TODO: Close client if we can't connect? + logger.exception(f"Failed to send message to '{appid}'") + raise ex + + async def start(self, address): + log_app.set("APPSERVER") + server = await asyncio.start_server(self.handle_connection, **address) + logger.info(f"Serving on {address}") + async with server: + await server.serve_forever() + + +async def start_server(): + address = {'host': '127.0.0.1', 'port': '5000'} + server = AppServer() + await server.start(address) + + +if __name__ == '__main__': + asyncio.run(start_server()) diff --git a/run.py b/run.py index 56a190ed..71805b04 100644 --- a/run.py +++ b/run.py @@ -3,4 +3,4 @@ import os sys.path.insert(0, os.path.join(os.getcwd(), "bot")) -from bot import dev_main +from bot import main diff --git a/run_server.py b/run_server.py new file mode 100644 index 00000000..e4ffd6e4 --- /dev/null +++ b/run_server.py @@ -0,0 +1,20 @@ +import sys +import os +import argparse +import asyncio + + +sys.path.insert(0, os.path.join(os.getcwd(), "bot")) + +from bot.meta.ipc.server import AppServer +from bot.meta import conf + + +async def main(): + address = {'host': conf.appipc['server_host'], 'port': int(conf.appipc['server_port'])} + server = AppServer() + await server.start(address) + + +if __name__ == '__main__': + asyncio.run(main())