pep.py/objects/tokenList.py

169 lines
3.8 KiB
Python
Raw Normal View History

2016-05-18 17:12:46 +00:00
from objects import osuToken
2016-04-19 17:40:59 +00:00
import time
import threading
2016-05-18 17:12:46 +00:00
from events import logoutEvent
from helpers import logHelper as log
from constants import serverPackets
from helpers import userHelper
2016-04-19 17:40:59 +00:00
class tokenList:
"""
List of connected osu tokens
tokens -- dictionary. key: token string, value: token object
"""
tokens = {}
def addToken(self, __userID):
"""
Add a token object to tokens list
__userID -- user id associated to that token
return -- token object
"""
newToken = osuToken.token(__userID)
self.tokens[newToken.token] = newToken
return newToken
def deleteToken(self, __token):
"""
Delete a token from token list if it exists
__token -- token string
"""
if __token in self.tokens:
self.tokens.pop(__token)
def getUserIDFromToken(self, __token):
"""
Get user ID from a token
__token -- token to find
return: false if not found, userID if found
"""
# Make sure the token exists
if __token not in self.tokens:
return False
# Get userID associated to that token
return self.tokens[__token].userID
def getTokenFromUserID(self, __userID):
"""
Get token from a user ID
__userID -- user ID to find
return -- False if not found, token object if found
"""
# Make sure the token exists
for _, value in self.tokens.items():
if value.userID == __userID:
return value
# Return none if not found
return None
def getTokenFromUsername(self, __username):
"""
Get token from a username
__username -- username to find
return -- False if not found, token object if found
"""
# lowercase
who = __username.lower()
# Make sure the token exists
for _, value in self.tokens.items():
if value.username.lower() == who:
return value
# Return none if not found
return None
def deleteOldTokens(self, __userID):
"""
Delete old userID's tokens if found
__userID -- tokens associated to this user will be deleted
"""
# Delete older tokens
for key, value in self.tokens.items():
if value.userID == __userID:
# Delete this token from the dictionary
self.tokens.pop(key)
# break or items() function throws errors
break
def multipleEnqueue(self, __packet, __who, __but = False):
"""
Enqueue a packet to multiple users
__packet -- packet bytes to enqueue
__who -- userIDs array
__but -- if True, enqueue to everyone but users in __who array
"""
for _, value in self.tokens.items():
shouldEnqueue = False
if value.userID in __who and not __but:
shouldEnqueue = True
elif value.userID not in __who and __but:
shouldEnqueue = True
if shouldEnqueue:
value.enqueue(__packet)
def enqueueAll(self, __packet):
"""
Enqueue packet(s) to every connected user
__packet -- packet bytes to enqueue
"""
for _, value in self.tokens.items():
value.enqueue(__packet)
def usersTimeoutCheckLoop(self, __timeoutTime = 100, __checkTime = 100):
"""
Deletes all timed out users.
If called once, will recall after __checkTime seconds and so on, forever
CALL THIS FUNCTION ONLY ONCE!
__timeoutTime - seconds of inactivity required to disconnect someone (Default: 100)
__checkTime - seconds between loops (Default: 100)
"""
timedOutTokens = [] # timed out users
timeoutLimit = time.time()-__timeoutTime
for key, value in self.tokens.items():
# Check timeout (fokabot is ignored)
if value.pingTime < timeoutLimit and value.userID != 999:
# That user has timed out, add to disconnected tokens
# We can't delete it while iterating or items() throws an error
timedOutTokens.append(key)
# Delete timed out users from self.tokens
# i is token string (dictionary key)
for i in timedOutTokens:
logoutEvent.handle(self.tokens[i], None)
# Schedule a new check (endless loop)
threading.Timer(__checkTime, self.usersTimeoutCheckLoop, [__timeoutTime, __checkTime]).start()