Initial Plugin Commit
This commit is contained in:
477
tracker/component.py
Normal file
477
tracker/component.py
Normal file
@@ -0,0 +1,477 @@
|
||||
from typing import Optional
|
||||
import random
|
||||
import twitchio
|
||||
from twitchio import PartialUser, Scopes, eventsub
|
||||
from twitchio.ext import commands as cmds
|
||||
|
||||
from datamodels import BotChannel, Communities, UserProfile
|
||||
from meta import CrocBot
|
||||
from utils.lib import utc_now
|
||||
|
||||
from . import logger
|
||||
from .data import EventData, TrackingChannel
|
||||
|
||||
|
||||
class TrackerComponent(cmds.Component):
|
||||
def __init__(self, bot: CrocBot):
|
||||
self.bot = bot
|
||||
self.data = bot.dbconn.load_registry(EventData())
|
||||
print(self.__all_listeners__)
|
||||
|
||||
# One command, which sends join link to start tracking
|
||||
# Utility for fetch_or_create community and profiles and setting names
|
||||
# Then just a stack of event listeners.
|
||||
|
||||
# ----- API -----
|
||||
async def component_load(self):
|
||||
await self.data.init()
|
||||
|
||||
async def component_teardown(self):
|
||||
pass
|
||||
|
||||
# ----- Methods -----
|
||||
async def start_tracking(self, channel: TrackingChannel):
|
||||
# TODO: Make sure that we aren't trying to make duplicate subscriptions here
|
||||
logger.debug(
|
||||
"Initialising event tracking for %s",
|
||||
channel.userid
|
||||
)
|
||||
# Get associated auth scopes
|
||||
rows = await self.bot.data.user_auth_scopes.select_where(userid=channel.userid)
|
||||
scopes = Scopes([row['scope'] for row in rows])
|
||||
|
||||
# Build subscription payloads based on available scopes
|
||||
subs = []
|
||||
usersubs = []
|
||||
subcls = []
|
||||
if Scopes.channel_read_redemptions in scopes or Scopes.channel_manage_redemptions in scopes:
|
||||
subcls.append(eventsub.ChannelPointsRedeemAddSubscription)
|
||||
subcls.append(eventsub.ChannelPointsRedeemUpdateSubscription)
|
||||
if Scopes.bits_read in scopes:
|
||||
subcls.append(eventsub.ChannelBitsUseSubscription)
|
||||
subcls.append(eventsub.ChannelCheerSubscription)
|
||||
if Scopes.channel_read_subscriptions in scopes:
|
||||
subcls.extend((
|
||||
eventsub.ChannelSubscribeSubscription,
|
||||
eventsub.ChannelSubscribeMessageSubscription,
|
||||
eventsub.ChannelSubscriptionGiftSubscription,
|
||||
))
|
||||
if Scopes.channel_read_polls in scopes or Scopes.channel_manage_polls in scopes:
|
||||
subcls.append(eventsub.ChannelPollEndSubscription)
|
||||
if Scopes.channel_read_vips in scopes or Scopes.channel_manage_vips in scopes:
|
||||
subcls.extend((
|
||||
eventsub.ChannelVIPAddSubscription,
|
||||
eventsub.ChannelVIPRemoveSubscription,
|
||||
))
|
||||
|
||||
subcls.extend((
|
||||
eventsub.StreamOnlineSubscription,
|
||||
eventsub.StreamOfflineSubscription,
|
||||
eventsub.ChannelUpdateSubscription,
|
||||
))
|
||||
|
||||
for subbr in subcls:
|
||||
subs.append(subbr(broadcaster_user_id=channel.userid))
|
||||
|
||||
# This isn't needed because joining the channel means we have these
|
||||
# Including them for completeness though
|
||||
# if Scopes.chat_read in scopes or Scopes.channel_bot in scopes:
|
||||
# subs.append(
|
||||
# eventsub.ChatMessageSubscription(
|
||||
# broadcaster_user_id=channel.userid,
|
||||
# user_id=self.bot.bot_id,
|
||||
# )
|
||||
# )
|
||||
if Scopes.moderator_read_followers in scopes:
|
||||
usersubs.append(
|
||||
eventsub.ChannelFollowSubscription(
|
||||
broadcaster_user_id=channel.userid,
|
||||
moderator_user_id=channel.userid,
|
||||
)
|
||||
)
|
||||
|
||||
subs.extend((
|
||||
eventsub.ChannelRaidSubscription(to_broadcaster_user_id=channel.userid),
|
||||
eventsub.ChannelRaidSubscription(from_broadcaster_user_id=channel.userid),
|
||||
))
|
||||
|
||||
responses = []
|
||||
for sub in subs:
|
||||
try:
|
||||
if self.bot.using_webhooks:
|
||||
resp = await self.bot.subscribe_webhook(sub)
|
||||
else:
|
||||
resp = await self.bot.subscribe_websocket(sub)
|
||||
responses.append(resp)
|
||||
except Exception:
|
||||
logger.exception("Failed to subscribe to %s", str(sub))
|
||||
for sub in usersubs:
|
||||
try:
|
||||
if self.bot.using_webhooks:
|
||||
resp = await self.bot.subscribe_webhook(sub)
|
||||
else:
|
||||
resp = await self.bot.subscribe_websocket(sub, token_for=channel.userid, as_bot=False)
|
||||
responses.append(resp)
|
||||
except Exception:
|
||||
logger.exception("Failed to subscribe to %s", str(sub))
|
||||
|
||||
logger.info("Finished tracker subscription to %s: %s", channel.userid, ', '.join(map(str, responses)))
|
||||
|
||||
# ----- Events -----
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_safe_channel_joined(self, payload: BotChannel):
|
||||
# Check if the channel is tracked
|
||||
# If it is, call start_tracking
|
||||
tracked = await TrackingChannel.fetch(payload.userid)
|
||||
if tracked and tracked.joined:
|
||||
await self.start_tracking(tracked)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_custom_redemption_add(self, payload: twitchio.ChannelPointsRedemptionAdd):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='redemption_add',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
occurred_at=payload.redeemed_at,
|
||||
)
|
||||
detail_row = await self.data.redemption_add_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
redeem_id=payload.reward.id,
|
||||
redeem_title=payload.reward.title,
|
||||
redeem_cost=payload.reward.cost,
|
||||
redemption_id=payload.id,
|
||||
redemption_status=payload.status,
|
||||
message=payload.user_input,
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_custom_redemption_update(self, payload: twitchio.ChannelPointsRedemptionUpdate):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='redemption_update',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
)
|
||||
detail_row = await self.data.redemption_update_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
redeem_id=payload.reward.id,
|
||||
redeem_title=payload.reward.title,
|
||||
redeem_cost=payload.reward.cost,
|
||||
redemption_id=payload.id,
|
||||
redemption_status=payload.status,
|
||||
redeemed_at=utc_now()
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_follow(self, payload: twitchio.ChannelFollow):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
# Computer follower count
|
||||
followers = await payload.broadcaster.fetch_followers()
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='follow',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
)
|
||||
detail_row = await self.data.follow_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
follower_count=followers.total
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_bits_use(self, payload: twitchio.ChannelBitsUse):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='bits',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
)
|
||||
detail_row = await self.data.bits_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
bits=payload.bits,
|
||||
bits_type=payload.type,
|
||||
message=payload.text,
|
||||
powerup_type=payload.power_up.type if payload.power_up else None
|
||||
)
|
||||
self.bot.safe_dispatch('bits_use', payload=(event_row, detail_row, payload))
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_subscription(self, payload: twitchio.ChannelSubscribe):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='subscribe',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
)
|
||||
detail_row = await self.data.subscribe_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
tier=int(payload.tier),
|
||||
gifted=payload.gift,
|
||||
)
|
||||
self.bot.safe_dispatch('subscription', payload=(event_row, detail_row, payload))
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_subscription_gift(self, payload: twitchio.ChannelSubscriptionGift):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
if payload.user is not None:
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
else:
|
||||
pid = None
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='gift',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id if payload.user else None,
|
||||
)
|
||||
detail_row = await self.data.gift_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
tier=int(payload.tier),
|
||||
gifted_count=payload.total,
|
||||
)
|
||||
self.bot.safe_dispatch('subscription_gift', payload=(event_row, detail_row, payload))
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_subscription_message(self, payload: twitchio.ChannelSubscriptionMessage):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.user.id, name=payload.user.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='subscribe_message',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.user.id,
|
||||
)
|
||||
detail_row = await self.data.subscribe_message_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
tier=int(payload.tier),
|
||||
duration_months=payload.months,
|
||||
cumulative_months=payload.cumulative_months,
|
||||
streak_months=payload.streak_months,
|
||||
message=payload.text,
|
||||
)
|
||||
self.bot.safe_dispatch('subscription_message', payload=(event_row, detail_row, payload))
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_stream_online(self, payload: twitchio.StreamOnline):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='stream_online',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
occurred_at=payload.started_at,
|
||||
)
|
||||
detail_row = await self.data.stream_online_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
stream_id=payload.id,
|
||||
stream_type=payload.type,
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_stream_offline(self, payload: twitchio.StreamOffline):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='stream_offline',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
)
|
||||
detail_row = await self.data.stream_offline_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_raid(self, payload: twitchio.ChannelRaid):
|
||||
await self._event_raid_out(
|
||||
payload.from_broadcaster,
|
||||
payload.to_broadcaster,
|
||||
payload.viewer_count,
|
||||
)
|
||||
await self._event_raid_in(
|
||||
payload.to_broadcaster,
|
||||
payload.from_broadcaster,
|
||||
payload.viewer_count,
|
||||
)
|
||||
|
||||
async def _event_raid_out(self, broadcaster: PartialUser, to_broadcaster: PartialUser, viewer_count: int):
|
||||
tracked = await TrackingChannel.fetch(broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=broadcaster.id, name=broadcaster.name)
|
||||
cid = community.communityid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='raidout',
|
||||
communityid=cid,
|
||||
channel_id=broadcaster.id,
|
||||
)
|
||||
detail_row = await self.data.raid_out_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
target_id=to_broadcaster.id,
|
||||
target_name=to_broadcaster.name,
|
||||
viewer_count=viewer_count
|
||||
)
|
||||
|
||||
async def _event_raid_in(self, broadcaster: PartialUser, from_broadcaster: PartialUser, viewer_count: int):
|
||||
tracked = await TrackingChannel.fetch(broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=broadcaster.id, name=broadcaster.name)
|
||||
cid = community.communityid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='raidin',
|
||||
communityid=cid,
|
||||
channel_id=broadcaster.id,
|
||||
)
|
||||
detail_row = await self.data.raid_in_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
source_id=from_broadcaster.id,
|
||||
source_name=from_broadcaster.name,
|
||||
viewer_count=viewer_count
|
||||
)
|
||||
|
||||
@cmds.Component.listener()
|
||||
async def event_message(self, payload: twitchio.ChatMessage):
|
||||
tracked = await TrackingChannel.fetch(payload.broadcaster.id)
|
||||
if tracked and tracked.joined:
|
||||
community = await self.bot.community_fetch(twitchid=payload.broadcaster.id, name=payload.broadcaster.name)
|
||||
cid = community.communityid
|
||||
profile = await self.bot.profile_fetch(
|
||||
twitchid=payload.chatter.id, name=payload.chatter.name,
|
||||
)
|
||||
pid = profile.profileid
|
||||
|
||||
event_row = await self.data.events.insert(
|
||||
event_type='message',
|
||||
communityid=cid,
|
||||
channel_id=payload.broadcaster.id,
|
||||
profileid=pid,
|
||||
user_id=payload.chatter.id,
|
||||
)
|
||||
detail_row = await self.data.message_events.insert(
|
||||
event_id=event_row['event_id'],
|
||||
message_id=payload.id,
|
||||
message_type=payload.type,
|
||||
content=payload.text,
|
||||
source_channel_id=payload.source_id
|
||||
)
|
||||
|
||||
# ----- Commands -----
|
||||
@cmds.command(name='starttracking')
|
||||
async def cmd_starttracking(self, ctx: cmds.Context):
|
||||
if ctx.broadcaster:
|
||||
tracking = await TrackingChannel.fetch_or_create(ctx.channel.id, joined=True)
|
||||
if not tracking.joined:
|
||||
await tracking.update(joined=True)
|
||||
|
||||
rows = await self.bot.data.user_auth_scopes.select_where(userid=ctx.channel.id)
|
||||
scopes = Scopes([row['scope'] for row in rows])
|
||||
|
||||
url = self.bot.get_auth_url(
|
||||
Scopes({
|
||||
Scopes.channel_read_subscriptions,
|
||||
Scopes.channel_read_redemptions,
|
||||
Scopes.bits_read,
|
||||
Scopes.channel_read_polls,
|
||||
Scopes.channel_read_vips,
|
||||
Scopes.moderator_read_followers,
|
||||
*scopes
|
||||
})
|
||||
)
|
||||
await ctx.reply(f"Tracking enabled! Please authorise me to track events in this channel: {url}")
|
||||
else:
|
||||
await ctx.reply("Only the broadcaster can enable tracking.")
|
||||
|
||||
@cmds.command(name='stoptracking')
|
||||
async def cmd_stoptracking(self, ctx: cmds.Context):
|
||||
if ctx.broadcaster:
|
||||
tracking = await TrackingChannel.fetch(ctx.channel.id)
|
||||
if tracking and tracking.joined:
|
||||
await tracking.update(joined=False)
|
||||
# TODO: Actually disable the subscriptions instead of just on the next restart
|
||||
# This is tricky because some of the subscriptions may have been requested by other modules
|
||||
# Requires keeping track of the source of subscriptions, and having a central manager disable them when no-one is listening anymore.
|
||||
pass
|
||||
await ctx.reply("Event tracking has been disabled.")
|
||||
else:
|
||||
await ctx.reply("Event tracking is not enabled!")
|
||||
|
||||
else:
|
||||
await ctx.reply("Only the broadcaster can enable tracking.")
|
||||
|
||||
@cmds.command(name='join')
|
||||
async def cmd_join(self, ctx: cmds.Context):
|
||||
url = self.bot.get_auth_url()
|
||||
await ctx.reply(f"Invite me to your channel with: {url}")
|
||||
Reference in New Issue
Block a user