This repository has been archived on 2022-02-23. You can view files and clone it, but cannot push or open issues or pull requests.
lets/objects/beatmap.pyx
2018-12-09 00:15:56 -05:00

340 lines
12 KiB
Cython

import time
from common.log import logUtils as log
from constants import rankedStatuses
from helpers import osuapiHelper
from objects import glob
class beatmap:
__slots__ = ["songName", "fileMD5", "rankedStatus", "rankedStatusFrozen", "beatmapID", "beatmapSetID", "offset",
"rating", "starsStd", "starsTaiko", "starsCtb", "starsMania", "AR", "OD", "maxCombo", "hitLength",
"bpm", "playcount" ,"passcount", "refresh"]
def __init__(self, md5 = None, beatmapSetID = None, gameMode = 0, refresh=False):
"""
Initialize a beatmap object.
md5 -- beatmap md5. Optional.
beatmapSetID -- beatmapSetID. Optional.
"""
self.songName = ""
self.fileMD5 = ""
self.rankedStatus = rankedStatuses.NOT_SUBMITTED
self.rankedStatusFrozen = 0
self.beatmapID = 0
self.beatmapSetID = 0
self.offset = 0 # Won't implement
self.rating = 10.0 # Won't implement
self.starsStd = 0.0 # stars for converted
self.starsTaiko = 0.0 # stars for converted
self.starsCtb = 0.0 # stars for converted
self.starsMania = 0.0 # stars for converted
self.AR = 0.0
self.OD = 0.0
self.maxCombo = 0
self.hitLength = 0
self.bpm = 0
# Statistics for ranking panel
self.playcount = 0
# Force refresh from osu api
self.refresh = refresh
if md5 is not None and beatmapSetID is not None:
self.setData(md5, beatmapSetID)
def addBeatmapToDB(self):
"""
Add current beatmap data in db if not in yet
"""
# Make sure the beatmap is not already in db
bdata = glob.db.fetch("SELECT id, ranked_status_freezed, ranked FROM beatmaps WHERE beatmap_md5 = %s OR beatmap_id = %s LIMIT 1", [self.fileMD5, self.beatmapID])
if bdata is not None:
# This beatmap is already in db, remove old record
# Get current frozen status
frozen = bdata["ranked_status_freezed"]
if frozen == 1:
self.rankedStatus = bdata["ranked"]
log.debug("Deleting old beatmap data ({})".format(bdata["id"]))
glob.db.execute("DELETE FROM beatmaps WHERE id = %s LIMIT 1", [bdata["id"]])
else:
# Unfreeze beatmap status
frozen = 0
# Add new beatmap data
log.debug("Saving beatmap data in db...")
glob.db.execute("INSERT INTO `beatmaps` (`id`, `beatmap_id`, `beatmapset_id`, `beatmap_md5`, `song_name`, `ar`, `od`, `difficulty_std`, `difficulty_taiko`, `difficulty_ctb`, `difficulty_mania`, `max_combo`, `hit_length`, `bpm`, `ranked`, `latest_update`, `ranked_status_freezed`) VALUES (NULL, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", [
self.beatmapID,
self.beatmapSetID,
self.fileMD5,
self.songName.encode("utf-8", "ignore").decode("utf-8"),
self.AR,
self.OD,
self.starsStd,
self.starsTaiko,
self.starsCtb,
self.starsMania,
self.maxCombo,
self.hitLength,
self.bpm,
self.rankedStatus if frozen == 0 else 2,
int(time.time()),
frozen
])
def setDataFromDB(self, md5):
"""
Set this object's beatmap data from db.
md5 -- beatmap md5
return -- True if set, False if not set
"""
# Get data from DB
data = glob.db.fetch("SELECT * FROM beatmaps WHERE beatmap_md5 = %s LIMIT 1", [md5])
# Make sure the query returned something
if data is None:
return False
# Make sure the beatmap is not an old one
if data["difficulty_taiko"] == 0 and data["difficulty_ctb"] == 0 and data["difficulty_mania"] == 0:
log.debug("Difficulty for non-std gamemodes not found in DB, refreshing data from osu!api...")
return False
# Set cached data period
expire = int(glob.conf.config["server"]["beatmapcacheexpire"])
# If the beatmap is ranked, we don't need to refresh data from osu!api that often
if data["ranked"] >= rankedStatuses.RANKED and data["ranked_status_freezed"] == 0:
expire *= 3
# Make sure the beatmap data in db is not too old
if int(expire) > 0 and time.time() > data["latest_update"]+int(expire):
if data["ranked_status_freezed"] == 1:
self.setDataFromDict(data)
return False
# Data in DB, set beatmap data
log.debug("Got beatmap data from db")
self.setDataFromDict(data)
return True
def setDataFromDict(self, data):
"""
Set this object's beatmap data from data dictionary.
data -- data dictionary
return -- True if set, False if not set
"""
self.songName = data["song_name"]
self.fileMD5 = data["beatmap_md5"]
self.rankedStatus = int(data["ranked"])
self.rankedStatusFrozen = int(data["ranked_status_freezed"])
self.beatmapID = int(data["beatmap_id"])
self.beatmapSetID = int(data["beatmapset_id"])
self.AR = float(data["ar"])
self.OD = float(data["od"])
self.starsStd = float(data["difficulty_std"])
self.starsTaiko = float(data["difficulty_taiko"])
self.starsCtb = float(data["difficulty_ctb"])
self.starsMania = float(data["difficulty_mania"])
self.maxCombo = int(data["max_combo"])
self.hitLength = int(data["hit_length"])
self.bpm = int(data["bpm"])
# Ranking panel statistics
self.playcount = int(data["playcount"]) if "playcount" in data else 0
self.passcount = int(data["passcount"]) if "passcount" in data else 0
def setDataFromOsuApi(self, md5, beatmapSetID):
"""
Set this object's beatmap data from osu!api.
md5 -- beatmap md5
beatmapSetID -- beatmap set ID, used to check if a map is outdated
return -- True if set, False if not set
"""
# Check if osuapi is enabled
mainData = None
dataStd = osuapiHelper.osuApiRequest("get_beatmaps", "h={}&a=1&m=0".format(md5))
dataTaiko = osuapiHelper.osuApiRequest("get_beatmaps", "h={}&a=1&m=1".format(md5))
dataCtb = osuapiHelper.osuApiRequest("get_beatmaps", "h={}&a=1&m=2".format(md5))
dataMania = osuapiHelper.osuApiRequest("get_beatmaps", "h={}&a=1&m=3".format(md5))
if dataStd is not None:
mainData = dataStd
elif dataTaiko is not None:
mainData = dataTaiko
elif dataCtb is not None:
mainData = dataCtb
elif dataMania is not None:
mainData = dataMania
# If the beatmap is frozen and still valid from osu!api, return True so we don't overwrite anything
if mainData is not None and self.rankedStatusFrozen == 1:
return True
# Can't fint beatmap by MD5. The beatmap has been updated. Check with beatmap set ID
if mainData is None:
log.debug("osu!api data is None")
dataStd = osuapiHelper.osuApiRequest("get_beatmaps", "s={}&a=1&m=0".format(beatmapSetID))
dataTaiko = osuapiHelper.osuApiRequest("get_beatmaps", "s={}&a=1&m=1".format(beatmapSetID))
dataCtb = osuapiHelper.osuApiRequest("get_beatmaps", "s={}&a=1&m=2".format(beatmapSetID))
dataMania = osuapiHelper.osuApiRequest("get_beatmaps", "s={}&a=1&m=3".format(beatmapSetID))
if dataStd is not None:
mainData = dataStd
elif dataTaiko is not None:
mainData = dataTaiko
elif dataCtb is not None:
mainData = dataCtb
elif dataMania is not None:
mainData = dataMania
if mainData is None:
# Still no data, beatmap is not submitted
return False
else:
# We have some data, but md5 doesn't match. Beatmap is outdated
self.rankedStatus = rankedStatuses.NEED_UPDATE
return True
# We have data from osu!api, set beatmap data
log.debug("Got beatmap data from osu!api")
self.songName = "{} - {} [{}]".format(mainData["artist"], mainData["title"], mainData["version"])
self.fileMD5 = md5
self.rankedStatus = convertRankedStatus(int(mainData["approved"]))
self.beatmapID = int(mainData["beatmap_id"])
self.beatmapSetID = int(mainData["beatmapset_id"])
self.AR = float(mainData["diff_approach"])
self.OD = float(mainData["diff_overall"])
# Determine stars for every mode
self.starsStd = 0.0
self.starsTaiko = 0.0
self.starsCtb = 0.0
self.starsMania = 0.0
if dataStd is not None:
self.starsStd = float(dataStd["difficultyrating"])
if dataTaiko is not None:
self.starsTaiko = float(dataTaiko["difficultyrating"])
if dataCtb is not None:
self.starsCtb = float(dataCtb["difficultyrating"])
if dataMania is not None:
self.starsMania = float(dataMania["difficultyrating"])
self.maxCombo = int(mainData["max_combo"]) if mainData["max_combo"] is not None else 0
self.hitLength = int(mainData["hit_length"])
if mainData["bpm"] is not None:
self.bpm = int(float(mainData["bpm"]))
else:
self.bpm = -1
return True
def setData(self, md5, beatmapSetID):
"""
Set this object's beatmap data from highest level possible.
md5 -- beatmap MD5
beatmapSetID -- beatmap set ID
"""
# Get beatmap from db
dbResult = self.setDataFromDB(md5)
# Force refresh from osu api.
# We get data before to keep frozen maps ranked
# if they haven't been updated
if dbResult and self.refresh:
dbResult = False
if not dbResult:
log.debug("Beatmap not found in db")
# If this beatmap is not in db, get it from osu!api
apiResult = self.setDataFromOsuApi(md5, beatmapSetID)
if not apiResult:
# If it's not even in osu!api, this beatmap is not submitted
self.rankedStatus = rankedStatuses.NOT_SUBMITTED
elif self.rankedStatus != rankedStatuses.NOT_SUBMITTED and self.rankedStatus != rankedStatuses.NEED_UPDATE:
# We get beatmap data from osu!api, save it in db
self.addBeatmapToDB()
else:
log.debug("Beatmap found in db")
log.debug("{}\n{}\n{}\n{}".format(self.starsStd, self.starsTaiko, self.starsCtb, self.starsMania))
def getData(self, totalScores=0, version=4):
"""
Return this beatmap's data (header) for getscores
return -- beatmap header for getscores
"""
# Fix loved maps for old clients
if version < 4 and self.rankedStatus == rankedStatuses.LOVED:
rankedStatusOutput = rankedStatuses.QUALIFIED
else:
rankedStatusOutput = self.rankedStatus
data = "{}|false".format(rankedStatusOutput)
if self.rankedStatus != rankedStatuses.NOT_SUBMITTED and self.rankedStatus != rankedStatuses.NEED_UPDATE and self.rankedStatus != rankedStatuses.UNKNOWN:
# If the beatmap is updated and exists, the client needs more data
data += "|{}|{}|{}\n{}\n{}\n{}\n".format(self.beatmapID, self.beatmapSetID, totalScores, self.offset, self.songName, self.rating)
# Return the header
return data
def getCachedTillerinoPP(self):
"""
Returned cached pp values for 100, 99, 98 and 95 acc nomod
(used ONLY with Tillerino, pp is always calculated with oppai when submitting scores)
return -- list with pp values. [0,0,0,0] if not cached.
"""
data = glob.db.fetch("SELECT pp_100, pp_99, pp_98, pp_95 FROM beatmaps WHERE beatmap_md5 = %s LIMIT 1", [self.fileMD5])
if data is None:
return [0,0,0,0]
return [data["pp_100"], data["pp_99"], data["pp_98"], data["pp_95"]]
def saveCachedTillerinoPP(self, l):
"""
Save cached pp for tillerino
l -- list with 4 default pp values ([100,99,98,95])
"""
glob.db.execute("UPDATE beatmaps SET pp_100 = %s, pp_99 = %s, pp_98 = %s, pp_95 = %s WHERE beatmap_md5 = %s", [l[0], l[1], l[2], l[3], self.fileMD5])
@property
def is_rankable(self):
return self.rankedStatus >= rankedStatuses.RANKED and self.rankedStatus != rankedStatuses.UNKNOWN
def convertRankedStatus(approvedStatus):
"""
Convert approved_status (from osu!api) to ranked status (for getscores)
approvedStatus -- approved status, from osu!api
return -- rankedStatus for getscores
"""
approvedStatus = int(approvedStatus)
if approvedStatus <= 0:
return rankedStatuses.PENDING
elif approvedStatus == 1:
return rankedStatuses.RANKED
elif approvedStatus == 2:
return rankedStatuses.APPROVED
elif approvedStatus == 3:
return rankedStatuses.QUALIFIED
elif approvedStatus == 4:
return rankedStatuses.LOVED
else:
return rankedStatuses.UNKNOWN
def incrementPlaycount(md5, passed):
"""
Increment playcount (and passcount) for a beatmap
md5 -- beatmap md5
passed -- if True, increment passcount too
"""
glob.db.execute("UPDATE beatmaps SET playcount = playcount+1 WHERE beatmap_md5 = %s LIMIT 1", [md5])
if passed:
glob.db.execute("UPDATE beatmaps SET passcount = passcount+1 WHERE beatmap_md5 = %s LIMIT 1", [md5])