Add various locks in osuToken object

This commit is contained in:
Giuseppe Guerra 2017-08-11 00:45:44 +02:00
parent aa1887e2c4
commit 466004f239
3 changed files with 228 additions and 159 deletions

View File

@ -8,6 +8,10 @@ def handle(userToken, packetData):
# Start spectating packet # Start spectating packet
packetData = clientPackets.startSpectating(packetData) packetData = clientPackets.startSpectating(packetData)
# If the user id is less than 0, treat this as a stop spectating packet
if packetData["userID"] < 0:
userToken.stopSpectating()
# Get host token # Get host token
targetToken = glob.tokens.getTokenFromUserID(packetData["userID"]) targetToken = glob.tokens.getTokenFromUserID(packetData["userID"])
if targetToken is None: if targetToken is None:

View File

@ -93,7 +93,7 @@ class handler(requestsManager.asyncRequestHandler):
# Token exists, get its object and lock it # Token exists, get its object and lock it
userToken = glob.tokens.tokens[requestTokenString] userToken = glob.tokens.tokens[requestTokenString]
userToken.lock.acquire() userToken.processingLock.acquire()
# Keep reading packets until everything has been read # Keep reading packets until everything has been read
while pos < len(requestData): while pos < len(requestData):
@ -205,8 +205,8 @@ class handler(requestsManager.asyncRequestHandler):
if userToken is not None: if userToken is not None:
# Update ping time for timeout # Update ping time for timeout
userToken.updatePingTime() userToken.updatePingTime()
# Release token lock # Release processing lock
userToken.lock.release() userToken.processingLock.release()
# Delete token if kicked # Delete token if kicked
if userToken.kicked: if userToken.kicked:
glob.tokens.deleteToken(userToken) glob.tokens.deleteToken(userToken)

View File

@ -39,8 +39,6 @@ class token:
self.loginTime = int(time.time()) self.loginTime = int(time.time())
self.pingTime = self.loginTime self.pingTime = self.loginTime
self.timeOffset = timeOffset self.timeOffset = timeOffset
self.lock = threading.Lock()
self.bufferLock = threading.Lock()
self.streams = [] self.streams = []
self.tournament = tournament self.tournament = tournament
self.messagesBuffer = [] self.messagesBuffer = []
@ -87,6 +85,15 @@ class token:
else: else:
self.token = str(uuid.uuid4()) self.token = str(uuid.uuid4())
# Locks
self.processingLock = threading.Lock() # Acquired while there's an incoming packet from this user
self._bufferLock = threading.Lock() # Acquired while writing to packets buffer
self._internalLock = threading.Lock() # Acquired while performing internal operations on this token
self._chatLock = threading.Lock() # Acquired while performing chat operations
self._streamsLock = threading.Lock() # Acquired while joining/leaving streams
self._spectatorLock = threading.Lock() # Acquired while starting/stopping spectating
self._multiplayerLock = threading.Lock()# Acquired while joining/leaving streams
# Set stats # Set stats
self.updateCachedStats() self.updateCachedStats()
@ -105,7 +112,7 @@ class token:
""" """
try: try:
# Acquire the buffer lock # Acquire the buffer lock
self.bufferLock.acquire() self._bufferLock.acquire()
# Never enqueue for IRC clients or Foka # Never enqueue for IRC clients or Foka
if self.irc or self.userID < 999: if self.irc or self.userID < 999:
@ -118,15 +125,15 @@ class token:
log.warning("{}'s packets buffer is above 10M!! Lost some data!".format(self.username)) log.warning("{}'s packets buffer is above 10M!! Lost some data!".format(self.username))
finally: finally:
# Release the buffer lock # Release the buffer lock
self.bufferLock.release() self._bufferLock.release()
def resetQueue(self): def resetQueue(self):
"""Resets the queue. Call when enqueued packets have been sent""" """Resets the queue. Call when enqueued packets have been sent"""
try: try:
self.bufferLock.acquire() self._bufferLock.acquire()
self.queue = bytes() self.queue = bytes()
finally: finally:
self.bufferLock.release() self._bufferLock.release()
def joinChannel(self, channelObject): def joinChannel(self, channelObject):
""" """
@ -136,13 +143,17 @@ class token:
:raises: exceptions.userAlreadyInChannelException() :raises: exceptions.userAlreadyInChannelException()
exceptions.channelNoPermissionsException() exceptions.channelNoPermissionsException()
""" """
if channelObject.name in self.joinedChannels: try:
raise exceptions.userAlreadyInChannelException() self._chatLock.acquire()
if channelObject.publicRead == False and self.admin == False: if channelObject.name in self.joinedChannels:
raise exceptions.channelNoPermissionsException() raise exceptions.userAlreadyInChannelException()
self.joinedChannels.append(channelObject.name) if channelObject.publicRead == False and self.admin == False:
self.joinStream("chat/{}".format(channelObject.name)) raise exceptions.channelNoPermissionsException()
self.enqueue(serverPackets.channelJoinSuccess(self.userID, channelObject.clientName)) self.joinedChannels.append(channelObject.name)
self.joinStream("chat/{}".format(channelObject.name))
self.enqueue(serverPackets.channelJoinSuccess(self.userID, channelObject.clientName))
finally:
self._chatLock.release()
def partChannel(self, channelObject): def partChannel(self, channelObject):
""" """
@ -150,8 +161,12 @@ class token:
:param channelObject: channel object :param channelObject: channel object
""" """
self.joinedChannels.remove(channelObject.name) try:
self.leaveStream("chat/{}".format(channelObject.name)) self._chatLock.acquire()
self.joinedChannels.remove(channelObject.name)
self.leaveStream("chat/{}".format(channelObject.name))
finally:
self._chatLock.release()
def setLocation(self, latitude, longitude): def setLocation(self, latitude, longitude):
""" """
@ -185,42 +200,48 @@ class token:
:param host: host osuToken object :param host: host osuToken object
""" """
# Stop spectating old client try:
self.stopSpectating() # Stop spectating old client
self.stopSpectating()
# Set new spectator host # Acquire token's lock
self.spectating = host.token self._spectatorLock.acquire()
self.spectatingUserID = host.userID
# Add us to host's spectator list # Set new spectator host
host.spectators.append(self.token) self.spectating = host.token
self.spectatingUserID = host.userID
# Create and join spectator stream # Add us to host's spectator list
streamName = "spect/{}".format(host.userID) host.spectators.append(self.token)
glob.streams.add(streamName)
self.joinStream(streamName)
host.joinStream(streamName)
# Send spectator join packet to host # Create and join spectator stream
host.enqueue(serverPackets.addSpectator(self.userID)) streamName = "spect/{}".format(host.userID)
glob.streams.add(streamName)
self.joinStream(streamName)
host.joinStream(streamName)
# Create and join #spectator (#spect_userid) channel # Send spectator join packet to host
glob.channels.addTempChannel("#spect_{}".format(host.userID)) host.enqueue(serverPackets.addSpectator(self.userID))
chat.joinChannel(token=self, channel="#spect_{}".format(host.userID))
if len(host.spectators) == 1:
# First spectator, send #spectator join to host too
chat.joinChannel(token=host, channel="#spect_{}".format(host.userID))
# Send fellow spectator join to all clients # Create and join #spectator (#spect_userid) channel
glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID)) glob.channels.addTempChannel("#spect_{}".format(host.userID))
chat.joinChannel(token=self, channel="#spect_{}".format(host.userID))
if len(host.spectators) == 1:
# First spectator, send #spectator join to host too
chat.joinChannel(token=host, channel="#spect_{}".format(host.userID))
# Get current spectators list # Send fellow spectator join to all clients
for i in host.spectators: glob.streams.broadcast(streamName, serverPackets.fellowSpectatorJoined(self.userID))
if i != self.token and i in glob.tokens.tokens:
self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID))
# Log # Get current spectators list
log.info("{} is spectating {}".format(self.username, host.username)) for i in host.spectators:
if i != self.token and i in glob.tokens.tokens:
self.enqueue(serverPackets.fellowSpectatorJoined(glob.tokens.tokens[i].userID))
# Log
log.info("{} is spectating {}".format(self.username, host.username))
finally:
self._spectatorLock.release()
def stopSpectating(self): def stopSpectating(self):
""" """
@ -229,43 +250,49 @@ class token:
:return: :return:
""" """
# Remove our userID from host's spectators try:
if self.spectating is None: # Acquire token lock
return self._spectatorLock.acquire()
if self.spectating in glob.tokens.tokens:
hostToken = glob.tokens.tokens[self.spectating]
else:
hostToken = None
streamName = "spect/{}".format(self.spectatingUserID)
# Remove us from host's spectators list, # Remove our userID from host's spectators
# leave spectator stream if self.spectating is None:
# and end the spectator left packet to host return
self.leaveStream(streamName) if self.spectating in glob.tokens.tokens:
if hostToken is not None: hostToken = glob.tokens.tokens[self.spectating]
hostToken.spectators.remove(self.token) else:
hostToken.enqueue(serverPackets.removeSpectator(self.userID)) hostToken = None
streamName = "spect/{}".format(self.spectatingUserID)
# and to all other spectators # Remove us from host's spectators list,
for i in hostToken.spectators: # leave spectator stream
if i in glob.tokens.tokens: # and end the spectator left packet to host
glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID)) self.leaveStream(streamName)
if hostToken is not None:
hostToken.spectators.remove(self.token)
hostToken.enqueue(serverPackets.removeSpectator(self.userID))
# If nobody is spectating the host anymore, close #spectator channel # and to all other spectators
# and remove host from spect stream too for i in hostToken.spectators:
if len(hostToken.spectators) == 0: if i in glob.tokens.tokens:
chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True) glob.tokens.tokens[i].enqueue(serverPackets.fellowSpectatorLeft(self.userID))
hostToken.leaveStream(streamName)
# Console output # If nobody is spectating the host anymore, close #spectator channel
log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators)) # and remove host from spect stream too
if len(hostToken.spectators) == 0:
chat.partChannel(token=hostToken, channel="#spect_{}".format(hostToken.userID), kick=True)
hostToken.leaveStream(streamName)
# Part #spectator channel # Console output
chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True) log.info("{} is no longer spectating {}. Current spectators: {}".format(self.username, self.spectatingUserID, hostToken.spectators))
# Set our spectating user to 0 # Part #spectator channel
self.spectating = None chat.partChannel(token=self, channel="#spect_{}".format(self.spectatingUserID), kick=True)
self.spectatingUserID = 0
# Set our spectating user to 0
self.spectating = None
self.spectatingUserID = 0
finally:
self._spectatorLock.release()
def updatePingTime(self): def updatePingTime(self):
""" """
@ -282,35 +309,40 @@ class token:
:param matchID: new match ID :param matchID: new match ID
:return: :return:
""" """
# Make sure the match exists try:
if matchID not in glob.matches.matches: self._multiplayerLock.acquire()
return
# Match exists, get object # Make sure the match exists
match = glob.matches.matches[matchID] if matchID not in glob.matches.matches:
return
# Stop spectating # Match exists, get object
self.stopSpectating() match = glob.matches.matches[matchID]
# Leave other matches # Stop spectating
if self.matchID > -1 and self.matchID != matchID: self.stopSpectating()
self.leaveMatch()
# Try to join match # Leave other matches
joined = match.userJoin(self) if self.matchID > -1 and self.matchID != matchID:
if not joined: self.leaveMatch()
self.enqueue(serverPackets.matchJoinFail())
return
# Set matchID, join stream, channel and send packet # Try to join match
self.matchID = matchID joined = match.userJoin(self)
self.joinStream(match.streamName) if not joined:
chat.joinChannel(token=self, channel="#multi_{}".format(self.matchID)) self.enqueue(serverPackets.matchJoinFail())
self.enqueue(serverPackets.matchJoinSuccess(matchID)) return
# Alert the user if we have just joined a tourney match # Set matchID, join stream, channel and send packet
if match.isTourney: self.matchID = matchID
self.enqueue(serverPackets.notification("You are now in a tournament match.")) self.joinStream(match.streamName)
chat.joinChannel(token=self, channel="#multi_{}".format(self.matchID))
self.enqueue(serverPackets.matchJoinSuccess(matchID))
# Alert the user if we have just joined a tourney match
if match.isTourney:
self.enqueue(serverPackets.notification("You are now in a tournament match."))
finally:
self._multiplayerLock.release()
def leaveMatch(self): def leaveMatch(self):
""" """
@ -318,28 +350,33 @@ class token:
:return: :return:
""" """
# Make sure we are in a match try:
if self.matchID == -1: self._multiplayerLock.acquire()
return
# Part #multiplayer channel and streams (/ and /playing) # Make sure we are in a match
chat.partChannel(token=self, channel="#multi_{}".format(self.matchID), kick=True) if self.matchID == -1:
self.leaveStream("multi/{}".format(self.matchID)) return
self.leaveStream("multi/{}/playing".format(self.matchID)) # optional
# Set usertoken match to -1 # Part #multiplayer channel and streams (/ and /playing)
leavingMatchID = self.matchID chat.partChannel(token=self, channel="#multi_{}".format(self.matchID), kick=True)
self.matchID = -1 self.leaveStream("multi/{}".format(self.matchID))
self.leaveStream("multi/{}/playing".format(self.matchID)) # optional
# Make sure the match exists # Set usertoken match to -1
if leavingMatchID not in glob.matches.matches: leavingMatchID = self.matchID
return self.matchID = -1
# The match exists, get object # Make sure the match exists
match = glob.matches.matches[leavingMatchID] if leavingMatchID not in glob.matches.matches:
return
# Set slot to free # The match exists, get object
match.userLeft(self) match = glob.matches.matches[leavingMatchID]
# Set slot to free
match.userLeft(self)
finally:
self._multiplayerLock.release()
def kick(self, message="You have been kicked from the server. Please login again.", reason="kick"): def kick(self, message="You have been kicked from the server. Please login again.", reason="kick"):
""" """
@ -350,14 +387,19 @@ class token:
:param reason: Kick reason, used in logs. Default: "kick" :param reason: Kick reason, used in logs. Default: "kick"
:return: :return:
""" """
# Send packet to target try:
log.info("{} has been disconnected. ({})".format(self.username, reason)) self._internalLock.acquire()
if message != "":
self.enqueue(serverPackets.notification(message))
self.enqueue(serverPackets.loginFailed())
# Logout event # Send packet to target
logoutEvent.handle(self, deleteToken=self.irc) log.info("{} has been disconnected. ({})".format(self.username, reason))
if message != "":
self.enqueue(serverPackets.notification(message))
self.enqueue(serverPackets.loginFailed())
# Logout event
logoutEvent.handle(self, deleteToken=self.irc)
finally:
self._internalLock.release()
def silence(self, seconds = None, reason = "", author = 999): def silence(self, seconds = None, reason = "", author = 999):
""" """
@ -368,21 +410,26 @@ class token:
:param author: userID of who has silenced the user. Default: 999 (FokaBot) :param author: userID of who has silenced the user. Default: 999 (FokaBot)
:return: :return:
""" """
if seconds is None: try:
# Get silence expire from db if needed self._chatLock.acquire()
seconds = max(0, userUtils.getSilenceEnd(self.userID) - int(time.time()))
else:
# Silence in db and token
userUtils.silence(self.userID, seconds, reason, author)
# Silence token if seconds is None:
self.silenceEndTime = int(time.time()) + seconds # Get silence expire from db if needed
seconds = max(0, userUtils.getSilenceEnd(self.userID) - int(time.time()))
else:
# Silence in db and token
userUtils.silence(self.userID, seconds, reason, author)
# Send silence packet to user # Silence token
self.enqueue(serverPackets.silenceEndTime(seconds)) self.silenceEndTime = int(time.time()) + seconds
# Send silenced packet to everyone else # Send silence packet to user
glob.streams.broadcast("main", serverPackets.userSilenced(self.userID)) self.enqueue(serverPackets.silenceEndTime(seconds))
# Send silenced packet to everyone else
glob.streams.broadcast("main", serverPackets.userSilenced(self.userID))
finally:
self._chatLock.release()
def spamProtection(self, increaseSpamRate = True): def spamProtection(self, increaseSpamRate = True):
""" """
@ -391,13 +438,18 @@ class token:
:param increaseSpamRate: set to True if the user has sent a new message. Default: True :param increaseSpamRate: set to True if the user has sent a new message. Default: True
:return: :return:
""" """
# Increase the spam rate if needed try:
if increaseSpamRate: self._chatLock.acquire()
self.spamRate += 1
# Silence the user if needed # Increase the spam rate if needed
if self.spamRate > 10: if increaseSpamRate:
self.silence(1800, "Spamming (auto spam protection)") self.spamRate += 1
# Silence the user if needed
if self.spamRate > 10:
self.silence(1800, "Spamming (auto spam protection)")
finally:
self._chatLock.release()
def isSilenced(self): def isSilenced(self):
""" """
@ -422,17 +474,22 @@ class token:
:return: :return:
""" """
stats = userUtils.getUserStats(self.userID, self.gameMode) try:
log.debug(str(stats)) self._internalLock.acquire()
if stats is None:
log.warning("Stats query returned None") stats = userUtils.getUserStats(self.userID, self.gameMode)
return log.debug(str(stats))
self.rankedScore = stats["rankedScore"] if stats is None:
self.accuracy = stats["accuracy"]/100 log.warning("Stats query returned None")
self.playcount = stats["playcount"] return
self.totalScore = stats["totalScore"] self.rankedScore = stats["rankedScore"]
self.gameRank = stats["gameRank"] self.accuracy = stats["accuracy"]/100
self.pp = stats["pp"] self.playcount = stats["playcount"]
self.totalScore = stats["totalScore"]
self.gameRank = stats["gameRank"]
self.pp = stats["pp"]
finally:
self._internalLock.release()
def checkRestricted(self): def checkRestricted(self):
""" """
@ -484,9 +541,13 @@ class token:
:param name: stream name :param name: stream name
:return: :return:
""" """
glob.streams.join(name, token=self.token) try:
if name not in self.streams: self._streamsLock.acquire()
self.streams.append(name) glob.streams.join(name, token=self.token)
if name not in self.streams:
self.streams.append(name)
finally:
self._streamsLock.release()
def leaveStream(self, name): def leaveStream(self, name):
""" """
@ -495,9 +556,13 @@ class token:
:param name: stream name :param name: stream name
:return: :return:
""" """
glob.streams.leave(name, token=self.token) try:
if name in self.streams: self._streamsLock.acquire()
self.streams.remove(name) glob.streams.leave(name, token=self.token)
if name in self.streams:
self.streams.remove(name)
finally:
self._streamsLock.release()
def leaveAllStreams(self): def leaveAllStreams(self):
""" """