Refactor Alternativa Protocol implementation

This commit is contained in:
Pyogenics
2025-04-07 17:40:15 +01:00
parent 8a96286bae
commit 4b2ba7eba1
3 changed files with 133 additions and 219 deletions

View File

@@ -1,5 +1,5 @@
''' '''
Copyright (c) 2024 Pyogenics Copyright (c) 2025 Pyogenics
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
@@ -20,173 +20,144 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
''' '''
from io import BytesIO
from struct import unpack
from array import array
from zlib import decompress from zlib import decompress
from io import BytesIO
class OptionalMask: from .IOTools import unpackStream
def __init__(self):
self.optionalMask = []
def read(self, stream): def unwrapPacket(stream):
print("Read optional mask") print("Unwrapping packet")
# Read "Null-mask" field
nullMask = b""
nullMaskOffset = 0
nullMaskField = int.from_bytes(stream.read(1), "little") # Determine size and compression
nullMaskType = nullMaskField & 0b10000000 packetFlags = int.from_bytes(stream.read(1))
if nullMaskType == 0: compressedPacket = (packetFlags & 0b01000000) > 0
# Short null-mask: 5-29 bits
nullMaskLength = nullMaskField & 0b01100000
nullMask += bytes(nullMaskField & 0b00011111)
nullMask += stream.read(nullMaskLength) # 1,2 or 3 bytes
nullMaskOffset = 3
else:
# Long null-mask: 64 - 4194304 bytes
nullMaskLengthSize = nullMaskField & 0b01000000
nullMaskLength = nullMaskField & 0b00111111
if nullMaskLengthSize > 0:
# Long length: 22 bits
nullMaskLength = nullMaskLength << 16
nullMaskLength += int.from_bytes(stream.read(2), "big")
else:
# Short length: 6 bits
pass
nullMask += stream.read(nullMaskLength)
nullMaskOffset = 0
nullMask = BytesIO(nullMask) packetLength = 0
# Process first byte (the first byte is missing some bits on some nullmask configs) packetLengthType = packetFlags & 0b10000000
maskByte = int.from_bytes(nullMask.read(1)) if packetLengthType == 0:
for bitI in range(7 - nullMaskOffset, -1, -1): # This is a short packet
self.optionalMask.append( packetLength = int.from_bytes(stream.read(1))
not bool(maskByte & (2**bitI)) packetLength += (packetFlags & 0b00111111) << 8 # Part of the length is embedded in the flags field
)
# Process the rest of the bytes
for maskByte in nullMask.read():
for bitI in range(7, -1, -1):
self.optionalMask.append(
not bool(maskByte & (2**bitI))
)
print(f"Optional mask flags: {len(self.optionalMask)}")
def getOptional(self):
optional = self.optionalMask.pop(0)
return optional
def getOptionals(self, count):
optionals = ()
for _ in range(count):
optionals += (self.optionalMask.pop(0),)
return optionals
def getLength(self):
return len(self.optionalMask)
def readPacket(stream):
print("Reading packet")
# Read "Package Length" field
packageLength = 0
packageGzip = False
packageLengthField = int.from_bytes(stream.read(1), "little")
packageLengthSize = packageLengthField & 0b10000000
if packageLengthSize == 0:
# Short package: 14 bits
packageLength += (packageLengthField & 0b00111111) << 8
packageLength += int.from_bytes(stream.read(1), "little")
packageGzip = packageLengthField & 0b01000000
else: else:
# Long package: 31 bits # This is a long packet
packageLength += (packageLengthField & 0b00111111) << 24 packetLength = int.from_bytes(stream.read(3), "big")
packageLength += int.from_bytes(stream.read(3), "little") packetLength += (packetFlags & 0b00111111) << 24
packageGzip = packageLengthField & 0b01000000
# Decompress gzip data
package = stream.read()
if packageGzip:
print("Decompressing packet")
package = decompress(package)
package = BytesIO(package)
return package # Decompress the packet if needed
packetData = stream.read(packetLength)
if compressedPacket:
print("Decompressing packet")
packetData = decompress(packetData)
return BytesIO(packetData)
def readOptionalMask(stream):
print("Reading optional mask")
optionalMask = []
# Determine mask type (there are multiple length types)
maskFlags = int.from_bytes(stream.read(1))
maskLengthType = maskFlags & 0b10000000
if maskLengthType == 0:
# Short mask: 5 optional bits + upto 3 extra bytes
# First read the integrated optional bits
integratedOptionalBits = maskFlags << 3 # Trim flag bits so we're left with the optionals and some padding bits
for bitI in range(7, 2, -1): #0b11111000 left to right
optional = (integratedOptionalBits & 2**bitI) == 0
optionalMask.append(optional)
# Now read the external bytes
externalByteCount = (maskFlags & 0b01100000) >> 5
externalBytes = stream.read(externalByteCount)
for externalByte in externalBytes:
for bitI in range(7, -1, -1): #0b11111111 left to right
optional = (externalByte & 2**bitI) == 0
optionalMask.append(optional)
else:
# This type of mask encodes an extra length/count field to increase the number of possible optionals significantly
maskLengthType = maskFlags & 0b01000000
externalByteCount = 0
if maskLengthType == 0:
# Medium mask: stores number of bytes used for the optional mask in the last 6 bits of the flags
externalByteCount = maskFlags & 0b00111111
else:
# Long mask: # Medium mask: stores number of bytes used for the optional mask in the last 6 bits of the flags + 2 extra bytes
externalByteCount = (maskFlags & 0b00111111) << 16
externalByteCount += int.from_bytes(stream.read(2), "big")
# Read the external bytes
externalBytes = stream.read(externalByteCount)
for externalByte in externalBytes:
for bitI in range(7, -1, -1): #0b11111111 left to right
optional = (externalByte & 2**bitI) == 0
optionalMask.append(optional)
optionalMask.reverse()
return optionalMask
''' '''
Array Array type readers
''' '''
def readArrayLength(package): def readArrayLength(packet):
arrayLength = 0 arrayLength = 0
arrayField = int.from_bytes(package.read(1), "little") arrayFlags = int.from_bytes(packet.read(1))
arrayLengthType = arrayField & 0b10000000 arrayLengthType = arrayFlags & 0b10000000
# Short array length
if arrayLengthType == 0: if arrayLengthType == 0:
# Length of the array is contained in the last 7 bits of this byte # Short array
arrayLength = arrayField & 0b01111111 arrayLength = arrayFlags & 0b01111111
else: # Must be large array length else:
longArrayLengthType = arrayField & 0b01000000 # Long array
# Length in last 6 bits + next byte arrayLengthType = arrayFlags & 0b01000000
if longArrayLengthType == 0: if arrayLengthType == 0:
lengthByte = int.from_bytes(package.read(1), "little") # Length in last 6 bits of flags + next byte
arrayLength = (arrayField & 0b00111111) << 8 arrayLength = (arrayFlags & 0b00111111) << 8
arrayLength += lengthByte arrayLength += int.from_bytes(packet.read(1))
else: # Length in last 6 bits + next 2 bytes else:
lengthBytes = int.from_bytes(package.read(2), "big") # Length in last 6 bits of flags + next 2 byte
arrayLength = (arrayField & 0b00111111) << 16 arrayLength = (arrayFlags & 0b00111111) << 16
arrayLength += lengthBytes arrayLength += int.from_bytes(packet.read(2), "big")
return arrayLength return arrayLength
def readObjectArray(package, objReader, optionalMask): def readObjectArray(packet, objReader, optionalMask):
length = readArrayLength(package) arrayLength = readArrayLength(packet)
objects = [] objects = []
for _ in range(length): for _ in range(arrayLength):
obj = objReader() obj = objReader()
obj.read(package, optionalMask) obj.read(packet, optionalMask)
objects.append(obj) objects.append(obj)
return objects return objects
def readString(package): def readString(packet):
stringLength = readArrayLength(package) stringLength = readArrayLength(packet)
string = package.read(stringLength) string = packet.read(stringLength)
string = string.decode("utf-8") string = string.decode("utf-8")
return string return string
def readInt16Array(package): def readInt16Array(packet):
length = readArrayLength(package) arrayLength = readArrayLength(packet)
integers = unpack(f"{length}h", package.read(length*2)) integers = unpackStream(f"{arrayLength}h", packet)
integers = array("h", integers)
return integers return list(integers)
def readIntArray(package): def readIntArray(packet):
length = readArrayLength(package) arrayLength = readArrayLength(packet)
integers = unpack(f"{length}i", package.read(length*4)) integers = unpackStream(f"{arrayLength}i", packet)
integers = array("i", integers)
return integers return list(integers)
def readInt64Array(package): def readInt64Array(packet):
length = readArrayLength(package) arrayLength = readArrayLength(packet)
integers = unpack(f"{length}q", package.read(length*8)) integers = unpackStream(f"{arrayLength}q", packet)
integers = array("q", integers)
return integers return list(integers)
def readFloatArray(package): def readFloatArray(packet):
length = readArrayLength(package) arrayLength = readArrayLength(packet)
floats = unpack(f">{length}f", package.read(length*4)) floats = unpackStream(f">{arrayLength}f", packet)
floats = array("f", floats)
return floats return list(floats)

View File

@@ -41,8 +41,6 @@ class AtlasRect:
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
self.width, self.x, self.y = unpackStream(">3I", stream) self.width, self.x, self.y = unpackStream(">3I", stream)
print(f"[AtlasRect height: {self.height} libraryName: {self.libraryName} name: {self.name} width: {self.width} x: {self.x} y: {self.y}]")
class CollisionBox: class CollisionBox:
def __init__(self): def __init__(self):
self.position = (0.0, 0.0, 0.0) self.position = (0.0, 0.0, 0.0)
@@ -53,8 +51,6 @@ class CollisionBox:
self.position = unpackStream(">3f", stream) self.position = unpackStream(">3f", stream)
self.rotation = unpackStream(">3f", stream) self.rotation = unpackStream(">3f", stream)
self.size = unpackStream(">3f", stream) self.size = unpackStream(">3f", stream)
# print(f"[CollisionBox position: {self.position} rotation: {self.rotation} size: {self.size}]")
class CollisionPlane: class CollisionPlane:
def __init__(self): def __init__(self):
@@ -68,8 +64,6 @@ class CollisionPlane:
self.position = unpackStream(">3f", stream) self.position = unpackStream(">3f", stream)
self.rotation = unpackStream(">3f", stream) self.rotation = unpackStream(">3f", stream)
self.width, = unpackStream(">d", stream) self.width, = unpackStream(">d", stream)
# print(f"[CollisionPlane lenght: {self.length} position: {self.position} rotation: {self.rotation} width: {self.width}]")
class CollisionTriangle: class CollisionTriangle:
def __init__(self): def __init__(self):
@@ -87,8 +81,6 @@ class CollisionTriangle:
self.v0 = unpackStream(">3f", stream) self.v0 = unpackStream(">3f", stream)
self.v1 = unpackStream(">3f", stream) self.v1 = unpackStream(">3f", stream)
self.v2 = unpackStream(">3f", stream) self.v2 = unpackStream(">3f", stream)
# print(f"[CollisionTriangle length: {self.length} position: {self.position} rotation: {self.rotation} v0: {self.v0} v1: {self.v1} v2: {self.v2}]")
class ScalarParameter: class ScalarParameter:
def __init__(self): def __init__(self):
@@ -108,7 +100,7 @@ class TextureParameter:
self.libraryName = None self.libraryName = None
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
if optionalMask.getOptional(): if optionalMask.pop():
self.libraryName = AlternativaProtocol.readString(stream) self.libraryName = AlternativaProtocol.readString(stream)
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
self.textureName = AlternativaProtocol.readString(stream) self.textureName = AlternativaProtocol.readString(stream)
@@ -151,24 +143,7 @@ class Atlas:
self.rects = [] self.rects = []
self.width = 0 self.width = 0
# Get the rect's texture from an atlas
# XXX: Handle padding?
def resolveRectImage(self, rectName, atlasImage):
rect = None
for childRect in self.rects:
if childRect.name == rectName:
rect = childRect
if rect == None:
raise RuntimeError(f"Couldn't find rect with name: {rectName}")
# Cut the texture out
rectTexture = atlasImage.crop(
(rect.x, rect.y, rect.x+rect.width, rect.y+rect.height)
)
return rectTexture
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
print("Read Atlas")
self.height, unpackStream(">i", stream) self.height, unpackStream(">i", stream)
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
self.padding = unpackStream(">I", stream) self.padding = unpackStream(">I", stream)
@@ -183,7 +158,6 @@ class Batch:
self.propIDs = "" self.propIDs = ""
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
print("Read Batch")
self.materialID, = unpackStream(">I", stream) self.materialID, = unpackStream(">I", stream)
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
self.position = unpackStream(">3f", stream) self.position = unpackStream(">3f", stream)
@@ -196,7 +170,6 @@ class CollisionGeometry:
self.triangles = [] self.triangles = []
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
print("Read CollisionGeometry")
self.boxes = AlternativaProtocol.readObjectArray(stream, CollisionBox, optionalMask) self.boxes = AlternativaProtocol.readObjectArray(stream, CollisionBox, optionalMask)
self.planes = AlternativaProtocol.readObjectArray(stream, CollisionPlane, optionalMask) self.planes = AlternativaProtocol.readObjectArray(stream, CollisionPlane, optionalMask)
self.triangles = AlternativaProtocol.readObjectArray(stream, CollisionTriangle, optionalMask) self.triangles = AlternativaProtocol.readObjectArray(stream, CollisionTriangle, optionalMask)
@@ -214,46 +187,20 @@ class Material:
self.vector3Parameters = None self.vector3Parameters = None
self.vector4Parameters = None self.vector4Parameters = None
def getTextureParameterByName(self, name):
for textureParameter in self.textureParameters:
if textureParameter.name == name: return textureParameter
raise RuntimeError(f"Couldn't find texture parameter with name: {name}")
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
print(f"Read Material")
self.ID, = unpackStream(">I", stream) self.ID, = unpackStream(">I", stream)
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
if optionalMask.getOptional(): if optionalMask.pop():
self.scalarParameters = AlternativaProtocol.readObjectArray(stream, ScalarParameter, optionalMask) self.scalarParameters = AlternativaProtocol.readObjectArray(stream, ScalarParameter, optionalMask)
self.shader = AlternativaProtocol.readString(stream) self.shader = AlternativaProtocol.readString(stream)
self.textureParameters = AlternativaProtocol.readObjectArray(stream, TextureParameter, optionalMask) self.textureParameters = AlternativaProtocol.readObjectArray(stream, TextureParameter, optionalMask)
if optionalMask.getOptional(): if optionalMask.pop():
self.vector2Parameters = AlternativaProtocol.readObjectArray(stream, Vector2Parameter, optionalMask) self.vector2Parameters = AlternativaProtocol.readObjectArray(stream, Vector2Parameter, optionalMask)
if optionalMask.getOptional(): if optionalMask.pop():
self.vector3Parameters = AlternativaProtocol.readObjectArray(stream, Vector3Parameter, optionalMask) self.vector3Parameters = AlternativaProtocol.readObjectArray(stream, Vector3Parameter, optionalMask)
if optionalMask.getOptional(): if optionalMask.pop():
self.vector4Parameters = AlternativaProtocol.readObjectArray(stream, Vector4Parameter, optionalMask) self.vector4Parameters = AlternativaProtocol.readObjectArray(stream, Vector4Parameter, optionalMask)
#TODO: tanki has more than this number of spawn types now, investigate it
BATTLEMAP_SPAWNPOINTTYPE_DM = 0
BATTLEMAP_SPAWNPOINTTYPE_DOM_TEAMA = 1
BATTLEMAP_SPAWNPOINTTYPE_DOM_TEAMB = 2
BATTLEMAP_SPAWNPOINTTYPE_RUGBY_TEAMA = 3
BATTLEMAP_SPAWNPOINTTYPE_RUGBY_TEAMB = 4
BATTLEMAP_SPAWNPOINTTYPE_TEAMA = 5
BATTLEMAP_SPAWNPOINTTYPE_TEAMB = 6
BATTLEMAP_SPAWNPOINTTYPE_UNKNOWN = 7
BattleMapSpawnPointTypeName = {
BATTLEMAP_SPAWNPOINTTYPE_DM: "Deathmatch",
BATTLEMAP_SPAWNPOINTTYPE_DOM_TEAMA: "DominationTeamA",
BATTLEMAP_SPAWNPOINTTYPE_DOM_TEAMB: "DominationTeamB",
BATTLEMAP_SPAWNPOINTTYPE_RUGBY_TEAMA: "RugbyTeamA",
BATTLEMAP_SPAWNPOINTTYPE_RUGBY_TEAMB: "RugbyTeamB",
BATTLEMAP_SPAWNPOINTTYPE_TEAMA: "TeamA",
BATTLEMAP_SPAWNPOINTTYPE_TEAMB: "TeamB",
BATTLEMAP_SPAWNPOINTTYPE_UNKNOWN: "Unknown"
}
class SpawnPoint: class SpawnPoint:
def __init__(self): def __init__(self):
self.position = (0.0, 0.0, 0.0) self.position = (0.0, 0.0, 0.0)
@@ -275,20 +222,20 @@ class Prop:
# Optional # Optional
self.groupName = None self.groupName = None
self.rotation = (0.0, 0.0, 0.0) self.rotation = None
self.scale = (1.0, 1.0, 1.0) self.scale = None
def read(self, stream, optionalMask): def read(self, stream, optionalMask):
if optionalMask.getOptional(): if optionalMask.pop():
self.groupName = AlternativaProtocol.readString(stream) self.groupName = AlternativaProtocol.readString(stream)
self.ID, = unpackStream(">I", stream) self.ID, = unpackStream(">I", stream)
self.libraryName = AlternativaProtocol.readString(stream) self.libraryName = AlternativaProtocol.readString(stream)
self.materialID, = unpackStream(">I", stream) self.materialID, = unpackStream(">I", stream)
self.name = AlternativaProtocol.readString(stream) self.name = AlternativaProtocol.readString(stream)
self.position = unpackStream(">3f", stream) self.position = unpackStream(">3f", stream)
if optionalMask.getOptional(): if optionalMask.pop():
self.rotation = unpackStream(">3f", stream) self.rotation = unpackStream(">3f", stream)
if optionalMask.getOptional(): if optionalMask.pop():
self.scale = unpackStream(">3f", stream) self.scale = unpackStream(">3f", stream)
''' '''
@@ -304,36 +251,26 @@ class BattleMap:
self.spawnPoints = [] self.spawnPoints = []
self.staticGeometry = [] self.staticGeometry = []
'''
Getters
'''
def getMaterialByID(self, materialID):
for material in self.materials:
if material.ID == materialID: return material
raise RuntimeError(f"Couldn't find material with ID: {materialID}")
''' '''
IO IO
''' '''
def read(self, stream): def read(self, stream):
print("Reading BIN map") print("Reading BattleMap")
# Read packet # Read packet
packet = AlternativaProtocol.readPacket(stream) packet = AlternativaProtocol.unwrapPacket(stream)
optionalMask = AlternativaProtocol.OptionalMask() optionalMask = AlternativaProtocol.readOptionalMask(packet)
optionalMask.read(packet)
# Read data # Read data
if optionalMask.getOptional(): if optionalMask.pop():
self.atlases = AlternativaProtocol.readObjectArray(packet, Atlas, optionalMask) self.atlases = AlternativaProtocol.readObjectArray(packet, Atlas, optionalMask)
if optionalMask.getOptional(): if optionalMask.pop():
self.batches = AlternativaProtocol.readObjectArray(packet, Batch, optionalMask) self.batches = AlternativaProtocol.readObjectArray(packet, Batch, optionalMask)
self.collisionGeometry = CollisionGeometry() self.collisionGeometry = CollisionGeometry()
self.collisionGeometry.read(packet, optionalMask) self.collisionGeometry.read(packet, optionalMask)
self.collisionGeometryOutsideGamingZone = CollisionGeometry() self.collisionGeometryOutsideGamingZone = CollisionGeometry()
self.collisionGeometryOutsideGamingZone.read(packet, optionalMask) self.collisionGeometryOutsideGamingZone.read(packet, optionalMask)
self.materials = AlternativaProtocol.readObjectArray(packet, Material, optionalMask) self.materials = AlternativaProtocol.readObjectArray(packet, Material, optionalMask)
if optionalMask.getOptional(): if optionalMask.pop():
self.spawnPoints = AlternativaProtocol.readObjectArray(packet, SpawnPoint, optionalMask) self.spawnPoints = AlternativaProtocol.readObjectArray(packet, SpawnPoint, optionalMask)
self.staticGeometry = AlternativaProtocol.readObjectArray(packet, Prop, optionalMask) self.staticGeometry = AlternativaProtocol.readObjectArray(packet, Prop, optionalMask)

View File

@@ -244,8 +244,14 @@ class BattleMapBlenderImporter:
propOB.name = f"{propData.name}_{propData.ID}" propOB.name = f"{propData.name}_{propData.ID}"
propOB.location = propData.position propOB.location = propData.position
propOB.rotation_mode = "XYZ" propOB.rotation_mode = "XYZ"
propOB.rotation_euler = propData.rotation propRotation = propData.rotation
propOB.scale = propData.scale if propRotation == None:
propRotation = (0.0, 0.0, 0.0)
propOB.rotation_euler = propRotation
propScale = propData.scale
if propScale == None:
propScale = (1.0, 1.0, 1.0)
propOB.scale = propScale
# Material # Material
ma = self.materials[propData.materialID] ma = self.materials[propData.materialID]