diff --git a/events/startSpectatingEvent.py b/events/startSpectatingEvent.py index 495705d..06ccd3f 100644 --- a/events/startSpectatingEvent.py +++ b/events/startSpectatingEvent.py @@ -8,6 +8,10 @@ def handle(userToken, packetData): # Start spectating packet packetData = clientPackets.startSpectating(packetData) + # If the user id is less than 0, treat this as a stop spectating packet + if packetData["userID"] < 0: + userToken.stopSpectating() + # Get host token targetToken = glob.tokens.getTokenFromUserID(packetData["userID"]) if targetToken is None: diff --git a/handlers/mainHandler.pyx b/handlers/mainHandler.pyx index e3cf03b..0957bee 100644 --- a/handlers/mainHandler.pyx +++ b/handlers/mainHandler.pyx @@ -93,7 +93,7 @@ class handler(requestsManager.asyncRequestHandler): # Token exists, get its object and lock it userToken = glob.tokens.tokens[requestTokenString] - userToken.lock.acquire() + userToken.processingLock.acquire() # Keep reading packets until everything has been read while pos < len(requestData): @@ -205,8 +205,8 @@ class handler(requestsManager.asyncRequestHandler): if userToken is not None: # Update ping time for timeout userToken.updatePingTime() - # Release token lock - userToken.lock.release() + # Release processing lock + userToken.processingLock.release() # Delete token if kicked if userToken.kicked: glob.tokens.deleteToken(userToken) diff --git a/objects/osuToken.py b/objects/osuToken.py index 68471eb..c42929d 100644 --- a/objects/osuToken.py +++ b/objects/osuToken.py @@ -39,8 +39,6 @@ class token: self.loginTime = int(time.time()) self.pingTime = self.loginTime self.timeOffset = timeOffset - self.lock = threading.Lock() - self.bufferLock = threading.Lock() self.streams = [] self.tournament = tournament self.messagesBuffer = [] @@ -87,6 +85,15 @@ class token: else: self.token = str(uuid.uuid4()) + # Locks + self.processingLock = threading.Lock() # Acquired while there's an incoming packet from this user + self._bufferLock = threading.Lock() # Acquired while writing to packets buffer + self._internalLock = threading.Lock() # Acquired while performing internal operations on this token + self._chatLock = threading.Lock() # Acquired while performing chat operations + self._streamsLock = threading.Lock() # Acquired while joining/leaving streams + self._spectatorLock = threading.Lock() # Acquired while starting/stopping spectating + self._multiplayerLock = threading.Lock()# Acquired while joining/leaving streams + # Set stats self.updateCachedStats() @@ -105,7 +112,7 @@ class token: """ try: # Acquire the buffer lock - self.bufferLock.acquire() + self._bufferLock.acquire() # Never enqueue for IRC clients or Foka if self.irc or self.userID < 999: @@ -118,15 +125,15 @@ class token: log.warning("{}'s packets buffer is above 10M!! Lost some data!".format(self.username)) finally: # Release the buffer lock - self.bufferLock.release() + self._bufferLock.release() def resetQueue(self): """Resets the queue. Call when enqueued packets have been sent""" try: - self.bufferLock.acquire() + self._bufferLock.acquire() self.queue = bytes() finally: - self.bufferLock.release() + self._bufferLock.release() def joinChannel(self, channelObject): """ @@ -136,13 +143,17 @@ class token: :raises: exceptions.userAlreadyInChannelException() exceptions.channelNoPermissionsException() """ - if channelObject.name in self.joinedChannels: - raise exceptions.userAlreadyInChannelException() - if channelObject.publicRead == False and self.admin == False: - raise exceptions.channelNoPermissionsException() - self.joinedChannels.append(channelObject.name) - self.joinStream("chat/{}".format(channelObject.name)) - self.enqueue(serverPackets.channelJoinSuccess(self.userID, channelObject.clientName)) + try: + self._chatLock.acquire() + if channelObject.name in self.joinedChannels: + raise exceptions.userAlreadyInChannelException() + if channelObject.publicRead == False and self.admin == False: + raise exceptions.channelNoPermissionsException() + self.joinedChannels.append(channelObject.name) + self.joinStream("chat/{}".format(channelObject.name)) + self.enqueue(serverPackets.channelJoinSuccess(self.userID, channelObject.clientName)) + finally: + self._chatLock.release() def partChannel(self, channelObject): """ @@ -150,8 +161,12 @@ class token: :param channelObject: channel object """ - self.joinedChannels.remove(channelObject.name) - self.leaveStream("chat/{}".format(channelObject.name)) + try: + self._chatLock.acquire() + self.joinedChannels.remove(channelObject.name) + self.leaveStream("chat/{}".format(channelObject.name)) + finally: + self._chatLock.release() def setLocation(self, latitude, longitude): """ @@ -185,42 +200,48 @@ class token: :param host: host osuToken object """ - # Stop spectating old client - self.stopSpectating() + try: + # Stop spectating old client + self.stopSpectating() - # Set new spectator host - self.spectating = host.token - self.spectatingUserID = host.userID + # Acquire token's lock + self._spectatorLock.acquire() - # Add us to host's spectator list - host.spectators.append(self.token) + # Set new spectator host + self.spectating = host.token + self.spectatingUserID = host.userID - # Create and join spectator stream - streamName = "spect/{}".format(host.userID) - glob.streams.add(streamName) - self.joinStream(streamName) - host.joinStream(streamName) + # Add us to host's spectator list + host.spectators.append(self.token) - # Send spectator join packet to host - host.enqueue(serverPackets.addSpectator(self.userID)) + # Create and join spectator stream + streamName = "spect/{}".format(host.userID) + glob.streams.add(streamName) + self.joinStream(streamName) + host.joinStream(streamName) - # Create and join #spectator (#spect_userid) channel - glob.channels.addTempChannel("#spect_{}".format(host.userID)) - chat.joinChannel(token=self, channel="#spect_{}".format(host.userID)) - if len(host.spectators) == 1: - # First spectator, send #spectator join to host too - chat.joinChannel(token=host, channel="#spect_{}".format(host.userID)) + # Send spectator join packet to host + host.enqueue(serverPackets.addSpectator(self.userID)) - # Send fellow spectator join to all clients - glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID)) + # Create and join #spectator (#spect_userid) channel + glob.channels.addTempChannel("#spect_{}".format(host.userID)) + chat.joinChannel(token=self, channel="#spect_{}".format(host.userID)) + if len(host.spectators) == 1: + # First spectator, send #spectator join to host too + chat.joinChannel(token=host, channel="#spect_{}".format(host.userID)) - # Get current spectators list - for i in host.spectators: - if i != self.token and i in glob.tokens.tokens: - self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID)) + # Send fellow spectator join to all clients + glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID)) - # Log - log.info("{} is spectating {}".format(self.username, host.username)) + # Get current spectators list + for i in host.spectators: + if i != self.token and i in glob.tokens.tokens: + self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID)) + + # Log + log.info("{} is spectating {}".format(self.username, host.username)) + finally: + self._spectatorLock.release() def stopSpectating(self): """ @@ -229,43 +250,49 @@ class token: :return: """ - # Remove our userID from host's spectators - if self.spectating is None: - return - if self.spectating in glob.tokens.tokens: - hostToken = glob.tokens.tokens[self.spectating] - else: - hostToken = None - streamName = "spect/{}".format(self.spectatingUserID) + try: + # Acquire token lock + self._spectatorLock.acquire() - # Remove us from host's spectators list, - # leave spectator stream - # and end the spectator left packet to host - self.leaveStream(streamName) - if hostToken is not None: - hostToken.spectators.remove(self.token) - hostToken.enqueue(serverPackets.removeSpectator(self.userID)) + # Remove our userID from host's spectators + if self.spectating is None: + return + if self.spectating in glob.tokens.tokens: + hostToken = glob.tokens.tokens[self.spectating] + else: + hostToken = None + streamName = "spect/{}".format(self.spectatingUserID) - # and to all other spectators - for i in hostToken.spectators: - if i in glob.tokens.tokens: - glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID)) + # Remove us from host's spectators list, + # leave spectator stream + # and end the spectator left packet to host + self.leaveStream(streamName) + if hostToken is not None: + hostToken.spectators.remove(self.token) + hostToken.enqueue(serverPackets.removeSpectator(self.userID)) - # If nobody is spectating the host anymore, close #spectator channel - # and remove host from spect stream too - if len(hostToken.spectators) == 0: - chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True) - hostToken.leaveStream(streamName) + # and to all other spectators + for i in hostToken.spectators: + if i in glob.tokens.tokens: + glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID)) - # Console output - log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators)) + # If nobody is spectating the host anymore, close #spectator channel + # and remove host from spect stream too + if len(hostToken.spectators) == 0: + chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True) + hostToken.leaveStream(streamName) - # Part #spectator channel - chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True) + # Console output + log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators)) - # Set our spectating user to 0 - self.spectating = None - self.spectatingUserID = 0 + # Part #spectator channel + chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True) + + # Set our spectating user to 0 + self.spectating = None + self.spectatingUserID = 0 + finally: + self._spectatorLock.release() def updatePingTime(self): """ @@ -282,35 +309,40 @@ class token: :param matchID: new match ID :return: """ - # Make sure the match exists - if matchID not in glob.matches.matches: - return + try: + self._multiplayerLock.acquire() - # Match exists, get object - match = glob.matches.matches[matchID] + # Make sure the match exists + if matchID not in glob.matches.matches: + return - # Stop spectating - self.stopSpectating() + # Match exists, get object + match = glob.matches.matches[matchID] - # Leave other matches - if self.matchID > -1 and self.matchID != matchID: - self.leaveMatch() + # Stop spectating + self.stopSpectating() - # Try to join match - joined = match.userJoin(self) - if not joined: - self.enqueue(serverPackets.matchJoinFail()) - return + # Leave other matches + if self.matchID > -1 and self.matchID != matchID: + self.leaveMatch() - # Set matchID, join stream, channel and send packet - self.matchID = matchID - self.joinStream(match.streamName) - chat.joinChannel(token=self, channel="#multi_{}".format(self.matchID)) - self.enqueue(serverPackets.matchJoinSuccess(matchID)) + # Try to join match + joined = match.userJoin(self) + if not joined: + self.enqueue(serverPackets.matchJoinFail()) + return - # Alert the user if we have just joined a tourney match - if match.isTourney: - self.enqueue(serverPackets.notification("You are now in a tournament match.")) + # Set matchID, join stream, channel and send packet + self.matchID = matchID + self.joinStream(match.streamName) + chat.joinChannel(token=self, channel="#multi_{}".format(self.matchID)) + self.enqueue(serverPackets.matchJoinSuccess(matchID)) + + # Alert the user if we have just joined a tourney match + if match.isTourney: + self.enqueue(serverPackets.notification("You are now in a tournament match.")) + finally: + self._multiplayerLock.release() def leaveMatch(self): """ @@ -318,28 +350,33 @@ class token: :return: """ - # Make sure we are in a match - if self.matchID == -1: - return + try: + self._multiplayerLock.acquire() - # Part #multiplayer channel and streams (/ and /playing) - chat.partChannel(token=self, channel="#multi_{}".format(self.matchID), kick=True) - self.leaveStream("multi/{}".format(self.matchID)) - self.leaveStream("multi/{}/playing".format(self.matchID)) # optional + # Make sure we are in a match + if self.matchID == -1: + return - # Set usertoken match to -1 - leavingMatchID = self.matchID - self.matchID = -1 + # Part #multiplayer channel and streams (/ and /playing) + chat.partChannel(token=self, channel="#multi_{}".format(self.matchID), kick=True) + self.leaveStream("multi/{}".format(self.matchID)) + self.leaveStream("multi/{}/playing".format(self.matchID)) # optional - # Make sure the match exists - if leavingMatchID not in glob.matches.matches: - return + # Set usertoken match to -1 + leavingMatchID = self.matchID + self.matchID = -1 - # The match exists, get object - match = glob.matches.matches[leavingMatchID] + # Make sure the match exists + if leavingMatchID not in glob.matches.matches: + return - # Set slot to free - match.userLeft(self) + # The match exists, get object + match = glob.matches.matches[leavingMatchID] + + # Set slot to free + match.userLeft(self) + finally: + self._multiplayerLock.release() def kick(self, message="You have been kicked from the server. Please login again.", reason="kick"): """ @@ -350,14 +387,19 @@ class token: :param reason: Kick reason, used in logs. Default: "kick" :return: """ - # Send packet to target - log.info("{} has been disconnected. ({})".format(self.username, reason)) - if message != "": - self.enqueue(serverPackets.notification(message)) - self.enqueue(serverPackets.loginFailed()) + try: + self._internalLock.acquire() - # Logout event - logoutEvent.handle(self, deleteToken=self.irc) + # Send packet to target + log.info("{} has been disconnected. ({})".format(self.username, reason)) + if message != "": + self.enqueue(serverPackets.notification(message)) + self.enqueue(serverPackets.loginFailed()) + + # Logout event + logoutEvent.handle(self, deleteToken=self.irc) + finally: + self._internalLock.release() def silence(self, seconds = None, reason = "", author = 999): """ @@ -368,21 +410,26 @@ class token: :param author: userID of who has silenced the user. Default: 999 (FokaBot) :return: """ - if seconds is None: - # Get silence expire from db if needed - seconds = max(0, userUtils.getSilenceEnd(self.userID) - int(time.time())) - else: - # Silence in db and token - userUtils.silence(self.userID, seconds, reason, author) + try: + self._chatLock.acquire() - # Silence token - self.silenceEndTime = int(time.time()) + seconds + if seconds is None: + # Get silence expire from db if needed + seconds = max(0, userUtils.getSilenceEnd(self.userID) - int(time.time())) + else: + # Silence in db and token + userUtils.silence(self.userID, seconds, reason, author) - # Send silence packet to user - self.enqueue(serverPackets.silenceEndTime(seconds)) + # Silence token + self.silenceEndTime = int(time.time()) + seconds - # Send silenced packet to everyone else - glob.streams.broadcast("main", serverPackets.userSilenced(self.userID)) + # Send silence packet to user + self.enqueue(serverPackets.silenceEndTime(seconds)) + + # Send silenced packet to everyone else + glob.streams.broadcast("main", serverPackets.userSilenced(self.userID)) + finally: + self._chatLock.release() def spamProtection(self, increaseSpamRate = True): """ @@ -391,13 +438,18 @@ class token: :param increaseSpamRate: set to True if the user has sent a new message. Default: True :return: """ - # Increase the spam rate if needed - if increaseSpamRate: - self.spamRate += 1 + try: + self._chatLock.acquire() - # Silence the user if needed - if self.spamRate > 10: - self.silence(1800, "Spamming (auto spam protection)") + # Increase the spam rate if needed + if increaseSpamRate: + self.spamRate += 1 + + # Silence the user if needed + if self.spamRate > 10: + self.silence(1800, "Spamming (auto spam protection)") + finally: + self._chatLock.release() def isSilenced(self): """ @@ -422,17 +474,22 @@ class token: :return: """ - stats = userUtils.getUserStats(self.userID, self.gameMode) - log.debug(str(stats)) - if stats is None: - log.warning("Stats query returned None") - return - self.rankedScore = stats["rankedScore"] - self.accuracy = stats["accuracy"]/100 - self.playcount = stats["playcount"] - self.totalScore = stats["totalScore"] - self.gameRank = stats["gameRank"] - self.pp = stats["pp"] + try: + self._internalLock.acquire() + + stats = userUtils.getUserStats(self.userID, self.gameMode) + log.debug(str(stats)) + if stats is None: + log.warning("Stats query returned None") + return + self.rankedScore = stats["rankedScore"] + self.accuracy = stats["accuracy"]/100 + self.playcount = stats["playcount"] + self.totalScore = stats["totalScore"] + self.gameRank = stats["gameRank"] + self.pp = stats["pp"] + finally: + self._internalLock.release() def checkRestricted(self): """ @@ -484,9 +541,13 @@ class token: :param name: stream name :return: """ - glob.streams.join(name, token=self.token) - if name not in self.streams: - self.streams.append(name) + try: + self._streamsLock.acquire() + glob.streams.join(name, token=self.token) + if name not in self.streams: + self.streams.append(name) + finally: + self._streamsLock.release() def leaveStream(self, name): """ @@ -495,9 +556,13 @@ class token: :param name: stream name :return: """ - glob.streams.leave(name, token=self.token) - if name in self.streams: - self.streams.remove(name) + try: + self._streamsLock.acquire() + glob.streams.leave(name, token=self.token) + if name in self.streams: + self.streams.remove(name) + finally: + self._streamsLock.release() def leaveAllStreams(self): """