from typing import Optional import random import twitchio from twitchio import PartialUser, Scopes, eventsub from twitchio.ext import commands as cmds from botdata import BotChannel from meta import Bot from utils.lib import utc_now from . import logger from .data import EventData, TrackingChannel class TrackerComponent(cmds.Component): def __init__(self, bot: Bot): self.bot = bot self.data = bot.dbconn.load_registry(EventData()) # One command, which sends join link to start tracking # Utility for fetch_or_create community and profiles and setting names # Then just a stack of event listeners. # ----- API ----- async def component_load(self): await self.data.init() await self.bot.version_check(*self.data.VERSION) async def component_teardown(self): pass # ----- Methods ----- async def start_tracking(self, channel: TrackingChannel): # TODO: Make sure that we aren't trying to make duplicate subscriptions here logger.debug( "Initialising event tracking for %s", channel.userid ) # Get associated auth scopes rows = await self.bot.data.user_auth_scopes.select_where(userid=channel.userid) scopes = Scopes([row['scope'] for row in rows]) # Build subscription payloads based on available scopes subs = [] usersubs = [] subcls = [] if Scopes.channel_read_redemptions in scopes or Scopes.channel_manage_redemptions in scopes: subcls.append(eventsub.ChannelPointsRedeemAddSubscription) subcls.append(eventsub.ChannelPointsRedeemUpdateSubscription) if Scopes.bits_read in scopes: subcls.append(eventsub.ChannelBitsUseSubscription) subcls.append(eventsub.ChannelCheerSubscription) if Scopes.channel_read_subscriptions in scopes: subcls.extend(( eventsub.ChannelSubscribeSubscription, eventsub.ChannelSubscribeMessageSubscription, eventsub.ChannelSubscriptionGiftSubscription, )) if Scopes.channel_read_polls in scopes or Scopes.channel_manage_polls in scopes: subcls.append(eventsub.ChannelPollEndSubscription) if Scopes.channel_read_vips in scopes or Scopes.channel_manage_vips in scopes: subcls.extend(( eventsub.ChannelVIPAddSubscription, eventsub.ChannelVIPRemoveSubscription, )) subcls.extend(( eventsub.StreamOnlineSubscription, eventsub.StreamOfflineSubscription, eventsub.ChannelUpdateSubscription, )) for subbr in subcls: subs.append(subbr(broadcaster_user_id=channel.userid)) # This isn't needed because joining the channel means we have these # Including them for completeness though # if Scopes.chat_read in scopes or Scopes.channel_bot in scopes: # subs.append( # eventsub.ChatMessageSubscription( # broadcaster_user_id=channel.userid, # user_id=self.bot.bot_id, # ) # ) if Scopes.moderator_read_followers in scopes: usersubs.append( eventsub.ChannelFollowSubscription( broadcaster_user_id=channel.userid, moderator_user_id=channel.userid, ) ) subs.extend(( eventsub.ChannelRaidSubscription(to_broadcaster_user_id=channel.userid), eventsub.ChannelRaidSubscription(from_broadcaster_user_id=channel.userid), )) responses = [] for sub in subs: try: if self.bot.using_webhooks: resp = await self.bot.subscribe_webhook(sub) else: resp = await self.bot.subscribe_websocket(sub) responses.append(resp) except Exception: logger.exception("Failed to subscribe to %s", str(sub)) for sub in usersubs: try: if self.bot.using_webhooks: resp = await self.bot.subscribe_webhook(sub) else: resp = await self.bot.subscribe_websocket(sub, token_for=channel.userid, as_bot=False) responses.append(resp) except Exception: logger.exception("Failed to subscribe to %s", str(sub)) logger.info("Finished tracker subscription to %s: %s", channel.userid, ', '.join(map(str, responses))) # ----- Events ----- @cmds.Component.listener() async def event_safe_channel_joined(self, payload: BotChannel): # Check if the channel is tracked # If it is, call start_tracking tracked = await TrackingChannel.fetch(payload.userid) if tracked and tracked.joined: await self.start_tracking(tracked) @cmds.Component.listener() async def event_custom_redemption_add(self, payload: twitchio.ChannelPointsRedemptionAdd): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid event_row = await self.data.events.insert( event_type='redemption_add', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, occurred_at=payload.redeemed_at, ) detail_row = await self.data.redemption_add_events.insert( event_id=event_row['event_id'], redeem_id=payload.reward.id, redeem_title=payload.reward.title, redeem_cost=payload.reward.cost, redemption_id=payload.id, redemption_status=payload.status, message=payload.user_input, ) @cmds.Component.listener() async def event_custom_redemption_update(self, payload: twitchio.ChannelPointsRedemptionUpdate): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid event_row = await self.data.events.insert( event_type='redemption_update', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, ) detail_row = await self.data.redemption_update_events.insert( event_id=event_row['event_id'], redeem_id=payload.reward.id, redeem_title=payload.reward.title, redeem_cost=payload.reward.cost, redemption_id=payload.id, redemption_status=payload.status, redeemed_at=utc_now() ) @cmds.Component.listener() async def event_follow(self, payload: twitchio.ChannelFollow): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid # Computer follower count followers = await payload.broadcaster.fetch_followers() event_row = await self.data.events.insert( event_type='follow', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, ) detail_row = await self.data.follow_events.insert( event_id=event_row['event_id'], follower_count=followers.total ) @cmds.Component.listener() async def event_bits_use(self, payload: twitchio.ChannelBitsUse): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid event_row = await self.data.events.insert( event_type='bits', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, ) detail_row = await self.data.bits_events.insert( event_id=event_row['event_id'], bits=payload.bits, bits_type=payload.type, message=payload.text, powerup_type=payload.power_up.type if payload.power_up else None ) self.bot.safe_dispatch('bits_use', payload=(event_row, detail_row, payload)) @cmds.Component.listener() async def event_subscription(self, payload: twitchio.ChannelSubscribe): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid event_row = await self.data.events.insert( event_type='subscribe', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, ) detail_row = await self.data.subscribe_events.insert( event_id=event_row['event_id'], tier=int(payload.tier), gifted=payload.gift, ) self.bot.safe_dispatch('subscription', payload=(event_row, detail_row, payload)) @cmds.Component.listener() async def event_subscription_gift(self, payload: twitchio.ChannelSubscriptionGift): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid if payload.user is not None: profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid else: pid = None event_row = await self.data.events.insert( event_type='gift', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id if payload.user else None, ) detail_row = await self.data.gift_events.insert( event_id=event_row['event_id'], tier=int(payload.tier), gifted_count=payload.total, ) self.bot.safe_dispatch('subscription_gift', payload=(event_row, detail_row, payload)) @cmds.Component.listener() async def event_subscription_message(self, payload: twitchio.ChannelSubscriptionMessage): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.user) pid = profile.profileid event_row = await self.data.events.insert( event_type='subscribe_message', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.user.id, ) detail_row = await self.data.subscribe_message_events.insert( event_id=event_row['event_id'], tier=int(payload.tier), duration_months=payload.months, cumulative_months=payload.cumulative_months, streak_months=payload.streak_months, message=payload.text, ) self.bot.safe_dispatch('subscription_message', payload=(event_row, detail_row, payload)) @cmds.Component.listener() async def event_stream_online(self, payload: twitchio.StreamOnline): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid event_row = await self.data.events.insert( event_type='stream_online', communityid=cid, channel_id=payload.broadcaster.id, occurred_at=payload.started_at, ) detail_row = await self.data.stream_online_events.insert( event_id=event_row['event_id'], stream_id=payload.id, stream_type=payload.type, ) @cmds.Component.listener() async def event_stream_offline(self, payload: twitchio.StreamOffline): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid event_row = await self.data.events.insert( event_type='stream_offline', communityid=cid, channel_id=payload.broadcaster.id, ) detail_row = await self.data.stream_offline_events.insert( event_id=event_row['event_id'], ) @cmds.Component.listener() async def event_raid(self, payload: twitchio.ChannelRaid): await self._event_raid_out( payload.from_broadcaster, payload.to_broadcaster, payload.viewer_count, ) await self._event_raid_in( payload.to_broadcaster, payload.from_broadcaster, payload.viewer_count, ) async def _event_raid_out(self, broadcaster: PartialUser, to_broadcaster: PartialUser, viewer_count: int): tracked = await TrackingChannel.fetch(broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(broadcaster) cid = community.communityid event_row = await self.data.events.insert( event_type='raidout', communityid=cid, channel_id=broadcaster.id, ) detail_row = await self.data.raid_out_events.insert( event_id=event_row['event_id'], target_id=to_broadcaster.id, target_name=to_broadcaster.name, viewer_count=viewer_count ) async def _event_raid_in(self, broadcaster: PartialUser, from_broadcaster: PartialUser, viewer_count: int): tracked = await TrackingChannel.fetch(broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(broadcaster) cid = community.communityid event_row = await self.data.events.insert( event_type='raidin', communityid=cid, channel_id=broadcaster.id, ) detail_row = await self.data.raid_in_events.insert( event_id=event_row['event_id'], source_id=from_broadcaster.id, source_name=from_broadcaster.name, viewer_count=viewer_count ) @cmds.Component.listener() async def event_message(self, payload: twitchio.ChatMessage): tracked = await TrackingChannel.fetch(payload.broadcaster.id) if tracked and tracked.joined: community = await self.bot.profiles.fetch_community(payload.broadcaster) cid = community.communityid profile = await self.bot.profiles.fetch_profile(payload.chatter) pid = profile.profileid event_row = await self.data.events.insert( event_type='message', communityid=cid, channel_id=payload.broadcaster.id, profileid=pid, user_id=payload.chatter.id, ) detail_row = await self.data.message_events.insert( event_id=event_row['event_id'], message_id=payload.id, message_type=payload.type, content=payload.text, source_channel_id=payload.source_id ) # ----- Commands ----- @cmds.command(name='starttracking') async def cmd_starttracking(self, ctx: cmds.Context): if ctx.broadcaster: tracking = await TrackingChannel.fetch_or_create(ctx.channel.id, joined=True) if not tracking.joined: await tracking.update(joined=True) rows = await self.bot.data.user_auth_scopes.select_where(userid=ctx.channel.id) scopes = Scopes([row['scope'] for row in rows]) url = self.bot.get_auth_url( Scopes({ Scopes.channel_read_subscriptions, Scopes.channel_read_redemptions, Scopes.bits_read, Scopes.channel_read_polls, Scopes.channel_read_vips, Scopes.moderator_read_followers, *scopes }) ) await ctx.reply(f"Tracking enabled! Please authorise me to track events in this channel: {url}") else: await ctx.reply("Only the broadcaster can enable tracking.") @cmds.command(name='stoptracking') async def cmd_stoptracking(self, ctx: cmds.Context): if ctx.broadcaster: tracking = await TrackingChannel.fetch(ctx.channel.id) if tracking and tracking.joined: await tracking.update(joined=False) # TODO: Actually disable the subscriptions instead of just on the next restart # This is tricky because some of the subscriptions may have been requested by other modules # Requires keeping track of the source of subscriptions, and having a central manager disable them when no-one is listening anymore. pass await ctx.reply("Event tracking has been disabled.") else: await ctx.reply("Event tracking is not enabled!") else: await ctx.reply("Only the broadcaster can enable tracking.") @cmds.command(name='join') async def cmd_join(self, ctx: cmds.Context): url = self.bot.get_auth_url() await ctx.reply(f"Invite me to your channel with: {url}")