Files
twitch-eventtracker-plugin/tracker/component.py
2025-09-01 19:47:34 +10:00

478 lines
20 KiB
Python

from typing import Optional
import random
import twitchio
from twitchio import PartialUser, Scopes, eventsub
from twitchio.ext import commands as cmds
from datamodels import BotChannel, Communities, UserProfile
from meta import Bot
from utils.lib import utc_now
from . import logger
from .data import EventData, TrackingChannel
class TrackerComponent(cmds.Component):
def __init__(self, bot: Bot):
self.bot = bot
self.data = bot.dbconn.load_registry(EventData())
print(self.__all_listeners__)
# One command, which sends join link to start tracking
# Utility for fetch_or_create community and profiles and setting names
# Then just a stack of event listeners.
# ----- API -----
async def component_load(self):
await self.data.init()
async def component_teardown(self):
pass
# ----- Methods -----
async def start_tracking(self, channel: TrackingChannel):
# TODO: Make sure that we aren't trying to make duplicate subscriptions here
logger.debug(
"Initialising event tracking for %s",
channel.userid
)
# Get associated auth scopes
rows = await self.bot.data.user_auth_scopes.select_where(userid=channel.userid)
scopes = Scopes([row['scope'] for row in rows])
# Build subscription payloads based on available scopes
subs = []
usersubs = []
subcls = []
if Scopes.channel_read_redemptions in scopes or Scopes.channel_manage_redemptions in scopes:
subcls.append(eventsub.ChannelPointsRedeemAddSubscription)
subcls.append(eventsub.ChannelPointsRedeemUpdateSubscription)
if Scopes.bits_read in scopes:
subcls.append(eventsub.ChannelBitsUseSubscription)
subcls.append(eventsub.ChannelCheerSubscription)
if Scopes.channel_read_subscriptions in scopes:
subcls.extend((
eventsub.ChannelSubscribeSubscription,
eventsub.ChannelSubscribeMessageSubscription,
eventsub.ChannelSubscriptionGiftSubscription,
))
if Scopes.channel_read_polls in scopes or Scopes.channel_manage_polls in scopes:
subcls.append(eventsub.ChannelPollEndSubscription)
if Scopes.channel_read_vips in scopes or Scopes.channel_manage_vips in scopes:
subcls.extend((
eventsub.ChannelVIPAddSubscription,
eventsub.ChannelVIPRemoveSubscription,
))
subcls.extend((
eventsub.StreamOnlineSubscription,
eventsub.StreamOfflineSubscription,
eventsub.ChannelUpdateSubscription,
))
for subbr in subcls:
subs.append(subbr(broadcaster_user_id=channel.userid))
# This isn't needed because joining the channel means we have these
# Including them for completeness though
# if Scopes.chat_read in scopes or Scopes.channel_bot in scopes:
# subs.append(
# eventsub.ChatMessageSubscription(
# broadcaster_user_id=channel.userid,
# user_id=self.bot.bot_id,
# )
# )
if Scopes.moderator_read_followers in scopes:
usersubs.append(
eventsub.ChannelFollowSubscription(
broadcaster_user_id=channel.userid,
moderator_user_id=channel.userid,
)
)
subs.extend((
eventsub.ChannelRaidSubscription(to_broadcaster_user_id=channel.userid),
eventsub.ChannelRaidSubscription(from_broadcaster_user_id=channel.userid),
))
responses = []
for sub in subs:
try:
if self.bot.using_webhooks:
resp = await self.bot.subscribe_webhook(sub)
else:
resp = await self.bot.subscribe_websocket(sub)
responses.append(resp)
except Exception:
logger.exception("Failed to subscribe to %s", str(sub))
for sub in usersubs:
try:
if self.bot.using_webhooks:
resp = await self.bot.subscribe_webhook(sub)
else:
resp = await self.bot.subscribe_websocket(sub, token_for=channel.userid, as_bot=False)
responses.append(resp)
except Exception:
logger.exception("Failed to subscribe to %s", str(sub))
logger.info("Finished tracker subscription to %s: %s", channel.userid, ', '.join(map(str, responses)))
# ----- Events -----
@cmds.Component.listener()
async def event_safe_channel_joined(self, payload: BotChannel):
# Check if the channel is tracked
# If it is, call start_tracking
tracked = await TrackingChannel.fetch(payload.userid)
if tracked and tracked.joined:
await self.start_tracking(tracked)
@cmds.Component.listener()
async def event_custom_redemption_add(self, payload: twitchio.ChannelPointsRedemptionAdd):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='redemption_add',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
occurred_at=payload.redeemed_at,
)
detail_row = await self.data.redemption_add_events.insert(
event_id=event_row['event_id'],
redeem_id=payload.reward.id,
redeem_title=payload.reward.title,
redeem_cost=payload.reward.cost,
redemption_id=payload.id,
redemption_status=payload.status,
message=payload.user_input,
)
@cmds.Component.listener()
async def event_custom_redemption_update(self, payload: twitchio.ChannelPointsRedemptionUpdate):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='redemption_update',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
)
detail_row = await self.data.redemption_update_events.insert(
event_id=event_row['event_id'],
redeem_id=payload.reward.id,
redeem_title=payload.reward.title,
redeem_cost=payload.reward.cost,
redemption_id=payload.id,
redemption_status=payload.status,
redeemed_at=utc_now()
)
@cmds.Component.listener()
async def event_follow(self, payload: twitchio.ChannelFollow):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
# Computer follower count
followers = await payload.broadcaster.fetch_followers()
event_row = await self.data.events.insert(
event_type='follow',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
)
detail_row = await self.data.follow_events.insert(
event_id=event_row['event_id'],
follower_count=followers.total
)
@cmds.Component.listener()
async def event_bits_use(self, payload: twitchio.ChannelBitsUse):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='bits',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
)
detail_row = await self.data.bits_events.insert(
event_id=event_row['event_id'],
bits=payload.bits,
bits_type=payload.type,
message=payload.text,
powerup_type=payload.power_up.type if payload.power_up else None
)
self.bot.safe_dispatch('bits_use', payload=(event_row, detail_row, payload))
@cmds.Component.listener()
async def event_subscription(self, payload: twitchio.ChannelSubscribe):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='subscribe',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
)
detail_row = await self.data.subscribe_events.insert(
event_id=event_row['event_id'],
tier=int(payload.tier),
gifted=payload.gift,
)
self.bot.safe_dispatch('subscription', payload=(event_row, detail_row, payload))
@cmds.Component.listener()
async def event_subscription_gift(self, payload: twitchio.ChannelSubscriptionGift):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
if payload.user is not None:
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
else:
pid = None
event_row = await self.data.events.insert(
event_type='gift',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id if payload.user else None,
)
detail_row = await self.data.gift_events.insert(
event_id=event_row['event_id'],
tier=int(payload.tier),
gifted_count=payload.total,
)
self.bot.safe_dispatch('subscription_gift', payload=(event_row, detail_row, payload))
@cmds.Component.listener()
async def event_subscription_message(self, payload: twitchio.ChannelSubscriptionMessage):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.user.id, name=payload.user.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='subscribe_message',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.user.id,
)
detail_row = await self.data.subscribe_message_events.insert(
event_id=event_row['event_id'],
tier=int(payload.tier),
duration_months=payload.months,
cumulative_months=payload.cumulative_months,
streak_months=payload.streak_months,
message=payload.text,
)
self.bot.safe_dispatch('subscription_message', payload=(event_row, detail_row, payload))
@cmds.Component.listener()
async def event_stream_online(self, payload: twitchio.StreamOnline):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
event_row = await self.data.events.insert(
event_type='stream_online',
communityid=cid,
channel_id=payload.broadcaster.id,
occurred_at=payload.started_at,
)
detail_row = await self.data.stream_online_events.insert(
event_id=event_row['event_id'],
stream_id=payload.id,
stream_type=payload.type,
)
@cmds.Component.listener()
async def event_stream_offline(self, payload: twitchio.StreamOffline):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
event_row = await self.data.events.insert(
event_type='stream_offline',
communityid=cid,
channel_id=payload.broadcaster.id,
)
detail_row = await self.data.stream_offline_events.insert(
event_id=event_row['event_id'],
)
@cmds.Component.listener()
async def event_raid(self, payload: twitchio.ChannelRaid):
await self._event_raid_out(
payload.from_broadcaster,
payload.to_broadcaster,
payload.viewer_count,
)
await self._event_raid_in(
payload.to_broadcaster,
payload.from_broadcaster,
payload.viewer_count,
)
async def _event_raid_out(self, broadcaster: PartialUser, to_broadcaster: PartialUser, viewer_count: int):
tracked = await TrackingChannel.fetch(broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=broadcaster.id, name=broadcaster.name)
cid = community.communityid
event_row = await self.data.events.insert(
event_type='raidout',
communityid=cid,
channel_id=broadcaster.id,
)
detail_row = await self.data.raid_out_events.insert(
event_id=event_row['event_id'],
target_id=to_broadcaster.id,
target_name=to_broadcaster.name,
viewer_count=viewer_count
)
async def _event_raid_in(self, broadcaster: PartialUser, from_broadcaster: PartialUser, viewer_count: int):
tracked = await TrackingChannel.fetch(broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=broadcaster.id, name=broadcaster.name)
cid = community.communityid
event_row = await self.data.events.insert(
event_type='raidin',
communityid=cid,
channel_id=broadcaster.id,
)
detail_row = await self.data.raid_in_events.insert(
event_id=event_row['event_id'],
source_id=from_broadcaster.id,
source_name=from_broadcaster.name,
viewer_count=viewer_count
)
@cmds.Component.listener()
async def event_message(self, payload: twitchio.ChatMessage):
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
if tracked and tracked.joined:
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
cid = community.communityid
profile = await self.bot.profile_fetch(
twitchid=payload.chatter.id, name=payload.chatter.name,
)
pid = profile.profileid
event_row = await self.data.events.insert(
event_type='message',
communityid=cid,
channel_id=payload.broadcaster.id,
profileid=pid,
user_id=payload.chatter.id,
)
detail_row = await self.data.message_events.insert(
event_id=event_row['event_id'],
message_id=payload.id,
message_type=payload.type,
content=payload.text,
source_channel_id=payload.source_id
)
# ----- Commands -----
@cmds.command(name='starttracking')
async def cmd_starttracking(self, ctx: cmds.Context):
if ctx.broadcaster:
tracking = await TrackingChannel.fetch_or_create(ctx.channel.id, joined=True)
if not tracking.joined:
await tracking.update(joined=True)
rows = await self.bot.data.user_auth_scopes.select_where(userid=ctx.channel.id)
scopes = Scopes([row['scope'] for row in rows])
url = self.bot.get_auth_url(
Scopes({
Scopes.channel_read_subscriptions,
Scopes.channel_read_redemptions,
Scopes.bits_read,
Scopes.channel_read_polls,
Scopes.channel_read_vips,
Scopes.moderator_read_followers,
*scopes
})
)
await ctx.reply(f"Tracking enabled! Please authorise me to track events in this channel: {url}")
else:
await ctx.reply("Only the broadcaster can enable tracking.")
@cmds.command(name='stoptracking')
async def cmd_stoptracking(self, ctx: cmds.Context):
if ctx.broadcaster:
tracking = await TrackingChannel.fetch(ctx.channel.id)
if tracking and tracking.joined:
await tracking.update(joined=False)
# TODO: Actually disable the subscriptions instead of just on the next restart
# This is tricky because some of the subscriptions may have been requested by other modules
# Requires keeping track of the source of subscriptions, and having a central manager disable them when no-one is listening anymore.
pass
await ctx.reply("Event tracking has been disabled.")
else:
await ctx.reply("Event tracking is not enabled!")
else:
await ctx.reply("Only the broadcaster can enable tracking.")
@cmds.command(name='join')
async def cmd_join(self, ctx: cmds.Context):
url = self.bot.get_auth_url()
await ctx.reply(f"Invite me to your channel with: {url}")