diff --git a/events/spectateFramesEvent.py b/events/spectateFramesEvent.py index dfbe6f4..5c618cd 100644 --- a/events/spectateFramesEvent.py +++ b/events/spectateFramesEvent.py @@ -1,30 +1,15 @@ from objects import glob from constants import serverPackets +from common.log import logUtils as log def handle(userToken, packetData): # get token data userID = userToken.userID # Send spectator frames to every spectator - glob.streams.broadcast("spect/{}".format(userID), serverPackets.spectatorFrames(packetData[7:])) - '''for i in userToken.spectators: - # Send to every user but host - if i != userID: - try: - # Get spectator token object - spectatorToken = glob.tokens.getTokenFromUserID(i) - - # Make sure the token exists - if spectatorToken is None: - raise exceptions.stopSpectating - - # Make sure this user is spectating us - if spectatorToken.spectating != userID: - raise exceptions.stopSpectating - - # Everything seems fine, send spectator frames to this spectator - spectatorToken.enqueue(serverPackets.spectatorFrames(packetData[7:])) - except exceptions.stopSpectating: - # Remove this user from spectators - userToken.removeSpectator(i) - userToken.enqueue(serverPackets.removeSpectator(i))''' + streamName = "spect/{}".format(userID) + glob.streams.broadcast(streamName, serverPackets.spectatorFrames(packetData[7:])) + log.debug("Broadcasting {}'s frames to {} clients".format( + userID, + len(glob.streams.streams[streamName].clients)) + ) \ No newline at end of file diff --git a/objects/osuToken.py b/objects/osuToken.py index fa071cb..9135edf 100644 --- a/objects/osuToken.py +++ b/objects/osuToken.py @@ -88,6 +88,7 @@ class token: # Locks self.processingLock = threading.Lock() # Acquired while there's an incoming packet from this user self._bufferLock = threading.Lock() # Acquired while writing to packets buffer + self._spectLock = threading.RLock() # Set stats self.updateCachedStats() @@ -187,42 +188,47 @@ class token: :param host: host osuToken object """ - # Stop spectating old client - self.stopSpectating() + try: + self._spectLock.acquire() - # Set new spectator host - self.spectating = host.token - self.spectatingUserID = host.userID + # Stop spectating old client + self.stopSpectating() - # Add us to host's spectator list - host.spectators.append(self.token) + # Set new spectator host + self.spectating = host.token + self.spectatingUserID = host.userID - # Create and join spectator stream - streamName = "spect/{}".format(host.userID) - glob.streams.add(streamName) - self.joinStream(streamName) - host.joinStream(streamName) + # Add us to host's spectator list + host.spectators.append(self.token) - # Send spectator join packet to host - host.enqueue(serverPackets.addSpectator(self.userID)) + # Create and join spectator stream + streamName = "spect/{}".format(host.userID) + glob.streams.add(streamName) + self.joinStream(streamName) + host.joinStream(streamName) - # Create and join #spectator (#spect_userid) channel - glob.channels.addTempChannel("#spect_{}".format(host.userID)) - chat.joinChannel(token=self, channel="#spect_{}".format(host.userID)) - if len(host.spectators) == 1: - # First spectator, send #spectator join to host too - chat.joinChannel(token=host, channel="#spect_{}".format(host.userID)) + # Send spectator join packet to host + host.enqueue(serverPackets.addSpectator(self.userID)) - # Send fellow spectator join to all clients - glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID)) + # Create and join #spectator (#spect_userid) channel + glob.channels.addTempChannel("#spect_{}".format(host.userID)) + chat.joinChannel(token=self, channel="#spect_{}".format(host.userID)) + if len(host.spectators) == 1: + # First spectator, send #spectator join to host too + chat.joinChannel(token=host, channel="#spect_{}".format(host.userID)) - # Get current spectators list - for i in host.spectators: - if i != self.token and i in glob.tokens.tokens: - self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID)) + # Send fellow spectator join to all clients + glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID)) - # Log - log.info("{} is spectating {}".format(self.username, host.username)) + # Get current spectators list + for i in host.spectators: + if i != self.token and i in glob.tokens.tokens: + self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID)) + + # Log + log.info("{} is spectating {}".format(self.username, host.username)) + finally: + self._spectLock.release() def stopSpectating(self): """ @@ -231,43 +237,48 @@ class token: :return: """ - # Remove our userID from host's spectators - if self.spectating is None: - return - if self.spectating in glob.tokens.tokens: - hostToken = glob.tokens.tokens[self.spectating] - else: - hostToken = None - streamName = "spect/{}".format(self.spectatingUserID) + try: + self._spectLock.acquire() - # Remove us from host's spectators list, - # leave spectator stream - # and end the spectator left packet to host - self.leaveStream(streamName) - if hostToken is not None: - hostToken.spectators.remove(self.token) - hostToken.enqueue(serverPackets.removeSpectator(self.userID)) + # Remove our userID from host's spectators + if self.spectating is None or self.spectatingUserID <= 0: + return + if self.spectating in glob.tokens.tokens: + hostToken = glob.tokens.tokens[self.spectating] + else: + hostToken = None + streamName = "spect/{}".format(self.spectatingUserID) - # and to all other spectators - for i in hostToken.spectators: - if i in glob.tokens.tokens: - glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID)) + # Remove us from host's spectators list, + # leave spectator stream + # and end the spectator left packet to host + self.leaveStream(streamName) + if hostToken is not None: + hostToken.spectators.remove(self.token) + hostToken.enqueue(serverPackets.removeSpectator(self.userID)) - # If nobody is spectating the host anymore, close #spectator channel - # and remove host from spect stream too - if len(hostToken.spectators) == 0: - chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True) - hostToken.leaveStream(streamName) + # and to all other spectators + for i in hostToken.spectators: + if i in glob.tokens.tokens: + glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID)) - # Console output - log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators)) + # If nobody is spectating the host anymore, close #spectator channel + # and remove host from spect stream too + if len(hostToken.spectators) == 0: + chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True) + hostToken.leaveStream(streamName) - # Part #spectator channel - chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True) + # Console output + log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators)) - # Set our spectating user to 0 - self.spectating = None - self.spectatingUserID = 0 + # Part #spectator channel + chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True) + + # Set our spectating user to 0 + self.spectating = None + self.spectatingUserID = 0 + finally: + self._spectLock.release() def updatePingTime(self): """