- User `profiles` component for profile and community fetch. - Add data version check - Move `datamodels` to `botdata`.
462 lines
19 KiB
Python
462 lines
19 KiB
Python
from typing import Optional
|
|
import random
|
|
import twitchio
|
|
from twitchio import PartialUser, Scopes, eventsub
|
|
from twitchio.ext import commands as cmds
|
|
|
|
from botdata import BotChannel
|
|
from meta import Bot
|
|
from utils.lib import utc_now
|
|
|
|
from . import logger
|
|
from .data import EventData, TrackingChannel
|
|
|
|
|
|
class TrackerComponent(cmds.Component):
|
|
def __init__(self, bot: Bot):
|
|
self.bot = bot
|
|
self.data = bot.dbconn.load_registry(EventData())
|
|
|
|
# One command, which sends join link to start tracking
|
|
# Utility for fetch_or_create community and profiles and setting names
|
|
# Then just a stack of event listeners.
|
|
|
|
# ----- API -----
|
|
async def component_load(self):
|
|
await self.data.init()
|
|
await self.bot.version_check(*self.data.VERSION)
|
|
|
|
async def component_teardown(self):
|
|
pass
|
|
|
|
# ----- Methods -----
|
|
async def start_tracking(self, channel: TrackingChannel):
|
|
# TODO: Make sure that we aren't trying to make duplicate subscriptions here
|
|
logger.debug(
|
|
"Initialising event tracking for %s",
|
|
channel.userid
|
|
)
|
|
# Get associated auth scopes
|
|
rows = await self.bot.data.user_auth_scopes.select_where(userid=channel.userid)
|
|
scopes = Scopes([row['scope'] for row in rows])
|
|
|
|
# Build subscription payloads based on available scopes
|
|
subs = []
|
|
usersubs = []
|
|
subcls = []
|
|
if Scopes.channel_read_redemptions in scopes or Scopes.channel_manage_redemptions in scopes:
|
|
subcls.append(eventsub.ChannelPointsRedeemAddSubscription)
|
|
subcls.append(eventsub.ChannelPointsRedeemUpdateSubscription)
|
|
if Scopes.bits_read in scopes:
|
|
subcls.append(eventsub.ChannelBitsUseSubscription)
|
|
subcls.append(eventsub.ChannelCheerSubscription)
|
|
if Scopes.channel_read_subscriptions in scopes:
|
|
subcls.extend((
|
|
eventsub.ChannelSubscribeSubscription,
|
|
eventsub.ChannelSubscribeMessageSubscription,
|
|
eventsub.ChannelSubscriptionGiftSubscription,
|
|
))
|
|
if Scopes.channel_read_polls in scopes or Scopes.channel_manage_polls in scopes:
|
|
subcls.append(eventsub.ChannelPollEndSubscription)
|
|
if Scopes.channel_read_vips in scopes or Scopes.channel_manage_vips in scopes:
|
|
subcls.extend((
|
|
eventsub.ChannelVIPAddSubscription,
|
|
eventsub.ChannelVIPRemoveSubscription,
|
|
))
|
|
|
|
subcls.extend((
|
|
eventsub.StreamOnlineSubscription,
|
|
eventsub.StreamOfflineSubscription,
|
|
eventsub.ChannelUpdateSubscription,
|
|
))
|
|
|
|
for subbr in subcls:
|
|
subs.append(subbr(broadcaster_user_id=channel.userid))
|
|
|
|
# This isn't needed because joining the channel means we have these
|
|
# Including them for completeness though
|
|
# if Scopes.chat_read in scopes or Scopes.channel_bot in scopes:
|
|
# subs.append(
|
|
# eventsub.ChatMessageSubscription(
|
|
# broadcaster_user_id=channel.userid,
|
|
# user_id=self.bot.bot_id,
|
|
# )
|
|
# )
|
|
if Scopes.moderator_read_followers in scopes:
|
|
usersubs.append(
|
|
eventsub.ChannelFollowSubscription(
|
|
broadcaster_user_id=channel.userid,
|
|
moderator_user_id=channel.userid,
|
|
)
|
|
)
|
|
|
|
subs.extend((
|
|
eventsub.ChannelRaidSubscription(to_broadcaster_user_id=channel.userid),
|
|
eventsub.ChannelRaidSubscription(from_broadcaster_user_id=channel.userid),
|
|
))
|
|
|
|
responses = []
|
|
for sub in subs:
|
|
try:
|
|
if self.bot.using_webhooks:
|
|
resp = await self.bot.subscribe_webhook(sub)
|
|
else:
|
|
resp = await self.bot.subscribe_websocket(sub)
|
|
responses.append(resp)
|
|
except Exception:
|
|
logger.exception("Failed to subscribe to %s", str(sub))
|
|
for sub in usersubs:
|
|
try:
|
|
if self.bot.using_webhooks:
|
|
resp = await self.bot.subscribe_webhook(sub)
|
|
else:
|
|
resp = await self.bot.subscribe_websocket(sub, token_for=channel.userid, as_bot=False)
|
|
responses.append(resp)
|
|
except Exception:
|
|
logger.exception("Failed to subscribe to %s", str(sub))
|
|
|
|
logger.info("Finished tracker subscription to %s: %s", channel.userid, ', '.join(map(str, responses)))
|
|
|
|
# ----- Events -----
|
|
|
|
@cmds.Component.listener()
|
|
async def event_safe_channel_joined(self, payload: BotChannel):
|
|
# Check if the channel is tracked
|
|
# If it is, call start_tracking
|
|
tracked = await TrackingChannel.fetch(payload.userid)
|
|
if tracked and tracked.joined:
|
|
await self.start_tracking(tracked)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_custom_redemption_add(self, payload: twitchio.ChannelPointsRedemptionAdd):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='redemption_add',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
occurred_at=payload.redeemed_at,
|
|
)
|
|
detail_row = await self.data.redemption_add_events.insert(
|
|
event_id=event_row['event_id'],
|
|
redeem_id=payload.reward.id,
|
|
redeem_title=payload.reward.title,
|
|
redeem_cost=payload.reward.cost,
|
|
redemption_id=payload.id,
|
|
redemption_status=payload.status,
|
|
message=payload.user_input,
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_custom_redemption_update(self, payload: twitchio.ChannelPointsRedemptionUpdate):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='redemption_update',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
)
|
|
detail_row = await self.data.redemption_update_events.insert(
|
|
event_id=event_row['event_id'],
|
|
redeem_id=payload.reward.id,
|
|
redeem_title=payload.reward.title,
|
|
redeem_cost=payload.reward.cost,
|
|
redemption_id=payload.id,
|
|
redemption_status=payload.status,
|
|
redeemed_at=utc_now()
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_follow(self, payload: twitchio.ChannelFollow):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
# Computer follower count
|
|
followers = await payload.broadcaster.fetch_followers()
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='follow',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
)
|
|
detail_row = await self.data.follow_events.insert(
|
|
event_id=event_row['event_id'],
|
|
follower_count=followers.total
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_bits_use(self, payload: twitchio.ChannelBitsUse):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='bits',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
)
|
|
detail_row = await self.data.bits_events.insert(
|
|
event_id=event_row['event_id'],
|
|
bits=payload.bits,
|
|
bits_type=payload.type,
|
|
message=payload.text,
|
|
powerup_type=payload.power_up.type if payload.power_up else None
|
|
)
|
|
self.bot.safe_dispatch('bits_use', payload=(event_row, detail_row, payload))
|
|
|
|
@cmds.Component.listener()
|
|
async def event_subscription(self, payload: twitchio.ChannelSubscribe):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='subscribe',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
)
|
|
detail_row = await self.data.subscribe_events.insert(
|
|
event_id=event_row['event_id'],
|
|
tier=int(payload.tier),
|
|
gifted=payload.gift,
|
|
)
|
|
self.bot.safe_dispatch('subscription', payload=(event_row, detail_row, payload))
|
|
|
|
@cmds.Component.listener()
|
|
async def event_subscription_gift(self, payload: twitchio.ChannelSubscriptionGift):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
if payload.user is not None:
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
else:
|
|
pid = None
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='gift',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id if payload.user else None,
|
|
)
|
|
detail_row = await self.data.gift_events.insert(
|
|
event_id=event_row['event_id'],
|
|
tier=int(payload.tier),
|
|
gifted_count=payload.total,
|
|
)
|
|
self.bot.safe_dispatch('subscription_gift', payload=(event_row, detail_row, payload))
|
|
|
|
@cmds.Component.listener()
|
|
async def event_subscription_message(self, payload: twitchio.ChannelSubscriptionMessage):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.user)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='subscribe_message',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.user.id,
|
|
)
|
|
detail_row = await self.data.subscribe_message_events.insert(
|
|
event_id=event_row['event_id'],
|
|
tier=int(payload.tier),
|
|
duration_months=payload.months,
|
|
cumulative_months=payload.cumulative_months,
|
|
streak_months=payload.streak_months,
|
|
message=payload.text,
|
|
)
|
|
self.bot.safe_dispatch('subscription_message', payload=(event_row, detail_row, payload))
|
|
|
|
@cmds.Component.listener()
|
|
async def event_stream_online(self, payload: twitchio.StreamOnline):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='stream_online',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
occurred_at=payload.started_at,
|
|
)
|
|
detail_row = await self.data.stream_online_events.insert(
|
|
event_id=event_row['event_id'],
|
|
stream_id=payload.id,
|
|
stream_type=payload.type,
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_stream_offline(self, payload: twitchio.StreamOffline):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='stream_offline',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
)
|
|
detail_row = await self.data.stream_offline_events.insert(
|
|
event_id=event_row['event_id'],
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_raid(self, payload: twitchio.ChannelRaid):
|
|
await self._event_raid_out(
|
|
payload.from_broadcaster,
|
|
payload.to_broadcaster,
|
|
payload.viewer_count,
|
|
)
|
|
await self._event_raid_in(
|
|
payload.to_broadcaster,
|
|
payload.from_broadcaster,
|
|
payload.viewer_count,
|
|
)
|
|
|
|
async def _event_raid_out(self, broadcaster: PartialUser, to_broadcaster: PartialUser, viewer_count: int):
|
|
tracked = await TrackingChannel.fetch(broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(broadcaster)
|
|
cid = community.communityid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='raidout',
|
|
communityid=cid,
|
|
channel_id=broadcaster.id,
|
|
)
|
|
detail_row = await self.data.raid_out_events.insert(
|
|
event_id=event_row['event_id'],
|
|
target_id=to_broadcaster.id,
|
|
target_name=to_broadcaster.name,
|
|
viewer_count=viewer_count
|
|
)
|
|
|
|
async def _event_raid_in(self, broadcaster: PartialUser, from_broadcaster: PartialUser, viewer_count: int):
|
|
tracked = await TrackingChannel.fetch(broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(broadcaster)
|
|
cid = community.communityid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='raidin',
|
|
communityid=cid,
|
|
channel_id=broadcaster.id,
|
|
)
|
|
detail_row = await self.data.raid_in_events.insert(
|
|
event_id=event_row['event_id'],
|
|
source_id=from_broadcaster.id,
|
|
source_name=from_broadcaster.name,
|
|
viewer_count=viewer_count
|
|
)
|
|
|
|
@cmds.Component.listener()
|
|
async def event_message(self, payload: twitchio.ChatMessage):
|
|
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
|
if tracked and tracked.joined:
|
|
community = await self.bot.profiles.fetch_community(payload.broadcaster)
|
|
cid = community.communityid
|
|
profile = await self.bot.profiles.fetch_profile(payload.chatter)
|
|
pid = profile.profileid
|
|
|
|
event_row = await self.data.events.insert(
|
|
event_type='message',
|
|
communityid=cid,
|
|
channel_id=payload.broadcaster.id,
|
|
profileid=pid,
|
|
user_id=payload.chatter.id,
|
|
)
|
|
detail_row = await self.data.message_events.insert(
|
|
event_id=event_row['event_id'],
|
|
message_id=payload.id,
|
|
message_type=payload.type,
|
|
content=payload.text,
|
|
source_channel_id=payload.source_id
|
|
)
|
|
|
|
# ----- Commands -----
|
|
@cmds.command(name='starttracking')
|
|
async def cmd_starttracking(self, ctx: cmds.Context):
|
|
if ctx.broadcaster:
|
|
tracking = await TrackingChannel.fetch_or_create(ctx.channel.id, joined=True)
|
|
if not tracking.joined:
|
|
await tracking.update(joined=True)
|
|
|
|
rows = await self.bot.data.user_auth_scopes.select_where(userid=ctx.channel.id)
|
|
scopes = Scopes([row['scope'] for row in rows])
|
|
|
|
url = self.bot.get_auth_url(
|
|
Scopes({
|
|
Scopes.channel_read_subscriptions,
|
|
Scopes.channel_read_redemptions,
|
|
Scopes.bits_read,
|
|
Scopes.channel_read_polls,
|
|
Scopes.channel_read_vips,
|
|
Scopes.moderator_read_followers,
|
|
*scopes
|
|
})
|
|
)
|
|
await ctx.reply(f"Tracking enabled! Please authorise me to track events in this channel: {url}")
|
|
else:
|
|
await ctx.reply("Only the broadcaster can enable tracking.")
|
|
|
|
@cmds.command(name='stoptracking')
|
|
async def cmd_stoptracking(self, ctx: cmds.Context):
|
|
if ctx.broadcaster:
|
|
tracking = await TrackingChannel.fetch(ctx.channel.id)
|
|
if tracking and tracking.joined:
|
|
await tracking.update(joined=False)
|
|
# TODO: Actually disable the subscriptions instead of just on the next restart
|
|
# This is tricky because some of the subscriptions may have been requested by other modules
|
|
# Requires keeping track of the source of subscriptions, and having a central manager disable them when no-one is listening anymore.
|
|
pass
|
|
await ctx.reply("Event tracking has been disabled.")
|
|
else:
|
|
await ctx.reply("Event tracking is not enabled!")
|
|
|
|
else:
|
|
await ctx.reply("Only the broadcaster can enable tracking.")
|
|
|
|
@cmds.command(name='join')
|
|
async def cmd_join(self, ctx: cmds.Context):
|
|
url = self.bot.get_auth_url()
|
|
await ctx.reply(f"Invite me to your channel with: {url}")
|