pep.py/helpers/packetHelper.pyx

269 lines
6.2 KiB
Cython
Raw Normal View History

2016-04-19 17:40:59 +00:00
import struct
2016-05-18 17:12:46 +00:00
from constants import dataTypes
2016-04-19 17:40:59 +00:00
cpdef bytearray uleb128Encode(int num):
2016-04-19 17:40:59 +00:00
"""
Encode an int to uleb128
2016-04-19 17:40:59 +00:00
:param num: int to encode
:return: bytearray with encoded number
2016-04-19 17:40:59 +00:00
"""
cdef bytearray arr = bytearray()
cdef int length = 0
2016-04-19 17:40:59 +00:00
if num == 0:
return bytearray(b"\x00")
while num > 0:
arr.append(num & 127)
2016-09-02 15:45:10 +00:00
num >>= 7
2016-04-19 17:40:59 +00:00
if num != 0:
2016-09-02 15:45:10 +00:00
arr[length] |= 128
2016-04-19 17:40:59 +00:00
length+=1
return arr
cpdef list uleb128Decode(bytes num):
2016-04-19 17:40:59 +00:00
"""
Decode a uleb128 to int
2016-04-19 17:40:59 +00:00
:param num: encoded uleb128 int
:return: (total, length)
2016-04-19 17:40:59 +00:00
"""
cdef int shift = 0
cdef list arr = [0,0] #total, length
cdef int b
2016-04-19 17:40:59 +00:00
while True:
b = num[arr[1]]
arr[1]+=1
2016-09-02 15:45:10 +00:00
arr[0] |= int(b & 127) << shift
2016-04-19 17:40:59 +00:00
if b & 128 == 0:
break
shift += 7
return arr
cpdef unpackData(bytes data, int dataType):
2016-04-19 17:40:59 +00:00
"""
Unpacks a single section of a packet.
2016-04-19 17:40:59 +00:00
:param data: bytes to unpack
:param dataType: data type
:return: unpacked bytes
2016-04-19 17:40:59 +00:00
"""
# Get right pack Type
2016-09-02 15:16:22 +00:00
if dataType == dataTypes.UINT16:
2016-04-19 17:40:59 +00:00
unpackType = "<H"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT16:
2016-04-19 17:40:59 +00:00
unpackType = "<h"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.UINT32:
2016-04-19 17:40:59 +00:00
unpackType = "<L"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT32:
2016-04-19 17:40:59 +00:00
unpackType = "<l"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.UINT64:
2016-04-19 17:40:59 +00:00
unpackType = "<Q"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT64:
2016-04-19 17:40:59 +00:00
unpackType = "<q"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.STRING:
2016-04-19 17:40:59 +00:00
unpackType = "<s"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.FFLOAT:
2016-04-19 17:40:59 +00:00
unpackType = "<f"
else:
unpackType = "<B"
# Unpack
return struct.unpack(unpackType, bytes(data))[0]
2016-04-19 17:40:59 +00:00
cpdef bytes packData(__data, int dataType):
2016-04-19 17:40:59 +00:00
"""
Packs a single section of a packet.
2016-04-19 17:40:59 +00:00
:param __data: data to pack
:param dataType: data type
:return: packed bytes
2016-04-19 17:40:59 +00:00
"""
cdef bytes data = bytes() # data to return
cdef bint pack = True # if True, use pack. False only with strings
cdef str packType
2016-04-19 17:40:59 +00:00
# Get right pack Type
2016-09-02 15:16:22 +00:00
if dataType == dataTypes.BBYTES:
2016-04-19 17:40:59 +00:00
# Bytes, do not use pack, do manually
pack = False
data = __data
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.INT_LIST:
# Pack manually
pack = False
# Add length
2016-09-02 15:16:22 +00:00
data = packData(len(__data), dataTypes.UINT16)
# Add all elements
for i in __data:
2016-09-02 15:16:22 +00:00
data += packData(i, dataTypes.SINT32)
elif dataType == dataTypes.STRING:
2016-04-19 17:40:59 +00:00
# String, do not use pack, do manually
pack = False
if len(__data) == 0:
# Empty string
data += b"\x00"
else:
# Non empty string
data += b"\x0B"
data += uleb128Encode(len(__data))
data += str.encode(__data, "latin_1", "ignore")
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.UINT16:
2016-04-19 17:40:59 +00:00
packType = "<H"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT16:
2016-04-19 17:40:59 +00:00
packType = "<h"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.UINT32:
2016-04-19 17:40:59 +00:00
packType = "<L"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT32:
2016-04-19 17:40:59 +00:00
packType = "<l"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.UINT64:
2016-04-19 17:40:59 +00:00
packType = "<Q"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.SINT64:
2016-04-19 17:40:59 +00:00
packType = "<q"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.STRING:
2016-04-19 17:40:59 +00:00
packType = "<s"
2016-09-02 15:16:22 +00:00
elif dataType == dataTypes.FFLOAT:
2016-04-19 17:40:59 +00:00
packType = "<f"
else:
packType = "<B"
# Pack if needed
2016-09-02 15:45:10 +00:00
if pack:
2016-04-19 17:40:59 +00:00
data += struct.pack(packType, __data)
return data
2016-12-26 09:33:05 +00:00
cpdef bytes buildPacket(int __packet, list __packetData = None):
2016-04-19 17:40:59 +00:00
"""
Builds a packet
2016-04-19 17:40:59 +00:00
:param __packet: packet ID
:param __packetData: packet structure [[data, dataType], [data, dataType], ...]
:return: packet bytes
2016-04-19 17:40:59 +00:00
"""
2016-12-26 09:33:05 +00:00
# Default argument
if __packetData is None:
__packetData = []
2016-04-19 17:40:59 +00:00
# Set some variables
cdef bytes packetData = bytes()
cdef int packetLength = 0
cdef bytes packetBytes = bytes()
2016-04-19 17:40:59 +00:00
# Pack packet data
cdef list i
2016-04-19 17:40:59 +00:00
for i in __packetData:
packetData += packData(i[0], i[1])
# Set packet length
packetLength = len(packetData)
# Return packet as bytes
packetBytes += struct.pack("<h", __packet) # packet id (int16)
packetBytes += bytes(b"\x00") # unused byte
packetBytes += struct.pack("<l", packetLength) # packet lenght (iint32)
packetBytes += packetData # packet data
return packetBytes
cpdef int readPacketID(bytes stream):
2016-04-19 17:40:59 +00:00
"""
Read packetID (first two bytes) from a packet
2016-04-19 17:40:59 +00:00
:param stream: packet bytes
:return: packet ID
2016-04-19 17:40:59 +00:00
"""
2016-09-02 15:16:22 +00:00
return unpackData(stream[0:2], dataTypes.UINT16)
2016-04-19 17:40:59 +00:00
cpdef int readPacketLength(bytes stream):
2016-04-19 17:40:59 +00:00
"""
Read packet data length (3:7 bytes) from a packet
2016-04-19 17:40:59 +00:00
:param stream: packet bytes
:return: packet data length
2016-04-19 17:40:59 +00:00
"""
2016-09-02 15:16:22 +00:00
return unpackData(stream[3:7], dataTypes.UINT32)
2016-04-19 17:40:59 +00:00
2016-12-26 09:33:05 +00:00
cpdef readPacketData(bytes stream, list structure=None, bint hasFirstBytes = True):
2016-04-19 17:40:59 +00:00
"""
Read packet data from `stream` according to `structure`
:param stream: packet bytes
:param structure: packet structure: [[name, dataType], [name, dataType], ...]
:param hasFirstBytes: if True, `stream` has packetID and length bytes.
if False, `stream` has only packet data. Default: True
:return: {name: unpackedValue, ...}
2016-04-19 17:40:59 +00:00
"""
2016-12-26 09:33:05 +00:00
# Default list argument
if structure is None:
structure = []
2016-04-19 17:40:59 +00:00
# Read packet ID (first 2 bytes)
cdef dict data = {}
2016-04-19 17:40:59 +00:00
# Skip packet ID and packet length if needed
cdef start, end
2016-09-02 15:45:10 +00:00
if hasFirstBytes:
2016-04-19 17:40:59 +00:00
end = 7
start = 7
else:
end = 0
start = 0
# Read packet
cdef list i
cdef bint unpack
for i in structure:
2016-04-19 17:40:59 +00:00
start = end
unpack = True
2016-09-02 15:16:22 +00:00
if i[1] == dataTypes.INT_LIST:
# sInt32 list.
# Unpack manually with for loop
unpack = False
# Read length (uInt16)
2016-09-02 15:16:22 +00:00
length = unpackData(stream[start:start+2], dataTypes.UINT16)
# Read all int inside list
data[i[0]] = []
for j in range(0,length):
2016-09-02 15:16:22 +00:00
data[i[0]].append(unpackData(stream[start+2+(4*j):start+2+(4*(j+1))], dataTypes.SINT32))
# Update end
end = start+2+(4*length)
2016-09-02 15:16:22 +00:00
elif i[1] == dataTypes.STRING:
2016-04-19 17:40:59 +00:00
# String, don't unpack
unpack = False
# Check empty string
if stream[start] == 0:
2016-04-19 17:40:59 +00:00
# Empty string
data[i[0]] = ""
end = start+1
else:
# Non empty string
# Read length and calculate end
length = uleb128Decode(stream[start+1:])
2016-04-19 17:40:59 +00:00
end = start+length[0]+length[1]+1
# Read bytes
#data[i[0]] = ''.join(chr(j) for j in stream[start+1+length[1]:end])
data[i[0]] = ""
for j in stream[start+1+length[1]:end]:
data[i[0]] += chr(j)
2016-09-02 15:16:22 +00:00
elif i[1] == dataTypes.BYTE:
2016-04-19 17:40:59 +00:00
end = start+1
2016-09-02 15:16:22 +00:00
elif i[1] == dataTypes.UINT16 or i[1] == dataTypes.SINT16:
2016-04-19 17:40:59 +00:00
end = start+2
2016-09-02 15:16:22 +00:00
elif i[1] == dataTypes.UINT32 or i[1] == dataTypes.SINT32:
2016-04-19 17:40:59 +00:00
end = start+4
2016-09-02 15:16:22 +00:00
elif i[1] == dataTypes.UINT64 or i[1] == dataTypes.SINT64:
2016-04-19 17:40:59 +00:00
end = start+8
# Unpack if needed
2016-09-02 15:45:10 +00:00
if unpack:
data[i[0]] = unpackData(stream[start:end], i[1])
2016-04-19 17:40:59 +00:00
return data