""" Minimal IRC client intended to connect to one channel and send a message. """ import asyncio import aiohttp import time import re from . import logger ACTIONS = ( "JOIN", "PART", "PING", "PRIVMSG", "PRIVMSG(ECHO)", "USERSTATE", "MODE", "WHISPER", "USERNOTICE", "NOTICE", ) ACTIONS2 = ("USERSTATE", "ROOMSTATE", "PRIVMSG", "USERNOTICE", "WHISPER") USER_SUB = re.compile(r":(?P.*)!") MESSAGE_RE = re.compile( r":(?P\S+) (?P\S+) (?P\S+)( :(?P.*))?$" ) FAST_RETURN = { "RECONNECT": {"code": 0, "action": "RECONNECT"}, "PING": {"action": "PING"}, } def parser(data: str, nick: str): groups = data.split() action = groups[1] if groups[1] == "JOIN" else groups[-2] channel = None message = None user = None badges = None _group_len = len(groups) if action in FAST_RETURN: return FAST_RETURN[action] elif groups[1] in FAST_RETURN: return FAST_RETURN[groups[1]] elif ( groups[1] in ACTIONS or (_group_len > 2 and groups[2] in ACTIONS) or (_group_len > 3 and groups[3] in {"PRIVMSG", "PRIVMSG(ECHO)"}) ): result = re.search(MESSAGE_RE, data) if not result: logger.error("****** MESSAGE_RE Failed! ******") raise ValueError("Could not parse message '%s' as IRC message" % data) user = result.group("useraddr").split("!")[0] action = result.group("action") channel = result.group("channel").lstrip("#") message = result.group("message") if action == "WHISPER": channel = None if action in ACTIONS2: prebadge = groups[0].split(";") badges = {} for badge in prebadge: badge = badge.split("=") try: badges[badge[0]] = badge[1] except IndexError: pass if action == "USERSTATE" and badges.get("display-name"): user = badges["display-name"].lower() if action == "USERNOTICE" and badges.get("login"): user = badges["login"].lower() elif action not in ACTIONS: action = None if not user: try: user = re.search(USER_SUB, groups[0]).group("user") except (AttributeError, ValueError): pass try: code = int(groups[1]) except ValueError: code = 0 batches = [] if code == 353: channel = groups[4] if channel[0] == "#": channel = channel[1:] else: logger.warning(f" (353) parse failed? ||{channel}||") if user is None: user = groups[-1][1:].lower() for b in groups[5:-1]: if b[0] == ":": b = b[1:] if "\r\n:" in b: batches.append(b.split("\r\n:")[0]) break else: batches.append(b) return dict( data=data, nick=nick, groups=groups, action=action, channel=channel, user=user, badges=badges, code=code, message=message, batches=batches, ) class FocusChannel: IRC_URI = "wss://irc-ws.chat.twitch.tv:443" def __init__(self, channel: str, token: str, session: aiohttp.ClientSession): self.channel = channel.lower() self.token = token self.session = session self._socket = None self._keeper: asyncio.Task | None = None self._ws_ready_event: asyncio.Event = asyncio.Event() self._joined: asyncio.Event = asyncio.Event() self.is_ready: asyncio.Event = asyncio.Event() self.modes = ("commands",) self._last_ping = 0 self._reconnect_requested = False @property def is_alive(self): return self._socket is not None and not self._socket.closed async def wait_until_ready(self): await self.is_ready.wait() async def connect(self): self.is_ready.clear() if self._keeper: self._keeper.cancel() if self.is_alive: await self._socket.close() try: self._socket = await self.session.ws_connect(url=self.IRC_URI) except Exception as e: logger.error(f"FocusChannel IRC connection failed: {e}") raise self._reconnect_requested = False self._keeper = asyncio.create_task(self._keep_alive()) self._ws_ready_event.set() async def authenticate(self): if not self.is_alive: raise ValueError("Cannot authenticate before connection.") await self.send(f"PASS oauth:{self.token}\r\n") await self.send(f"NICK {self.channel}\r\n") for cap in self.modes: await self.send(f"CAP REQ :twitch.tv/{cap}") async def join(self): if not self.is_alive: raise ValueError("Cannot join before connection.") self._joined.clear() await self.send(f"JOIN #{self.channel}\r\n") await self._joined.wait() async def send(self, message: str): await self._socket.send_str(message + "\r\n") async def _process_data(self, data: str): data = data.rstrip() parsed = parser(data, self.channel) if parsed["action"] == "PING": return await self._ping() elif parsed["action"] == "RECONNECT": return await self._reconnect(parsed) elif parsed["code"] != 0: return await self._code(parsed, parsed["code"]) elif data.startswith(":tmi.twitch.tv NOTICE * :Login unsuccessful"): logger.error(f"Failed to login to Twitch IRC channel {self.channel}") return await self._close() else: # TODO: We could handle other actions here return None async def _code(self, parsed, code: int): logger.info(f"{self!r} received '{code}': {parsed}") if code == 1: logger.info(f"FocusChannel logged into '{self.channel}'") elif code == 353: pass elif code in (2, 3, 4, 366, 372, 375): pass elif code == 376: pass else: pass async def _ping(self): self._last_ping = time.monotonic() await self.send("PONG :tmi.twitch.tv\r\n") async def _close(self): if self._keeper: self._keeper.cancel() self.is_ready.clear() if self._socket: await self._socket.close() async def _join(self, parsed): channel = parsed["channel"] logger.info(f"ACTION: JOIN:: {channel}") if self.channel != channel.lower(): logger.info( f"FocusChannel '{self.channel}' got join event for '{channel}'.. ignoring" ) else: self._joined.set() async def _keep_alive(self): await self._ws_ready_event.wait() self._ws_ready_event.clear() if not self._last_ping: self._last_ping = time.monotonic() while not self._socket.closed and not self._reconnect_requested: msg = await self._socket.receive() if msg.type is aiohttp.WSMsgType.CLOSED: logger.error(f"Websocket connection closed: {msg.extra}") break data = msg.data if data: logger.debug(f" < {data}") events = data.split("\r\n") for event in events: if not event: continue try: await self._process_data(event) except Exception: logger.exception( "Websocket message processing failed: %s", str(event) ) async def _reconnect(self, parsed): logger.info(f"ACTION: {self!r} recconecting") self._reconnect_requested = True if self._keeper: self._keeper.cancel() asyncio.create_task(self.connect()) async def send_msg(self, content: str): await self._socket.send_str(content) async def delete_msg(self, msgid: str): return await self.send_msg(f"/delete {msgid}")