Module ddCommunication.protocols.LCPWirelessProtocol.DDCPReplyTelegrams.ReadParamValueReply

Expand source code
from enum import IntEnum
from ..DDCPDataTypes import DDCPDataTypes
from ..DDCPErrorCodes import DDCPErrorCodes
import struct


class ReadParamValueReply:

    def __init__(self, values: list):
        self.Values = values

    def GetBytes(self):
        pass

    def GetMessageType(self):
        return 0x14

    def GetTypeHeader(self):
        pass

    @staticmethod
    def FromBytes(data):
        paramValues = []
        while len(data) > 3:
            length = data[0]
            ErrorCode = data[1]
            if DDCPErrorCodes(ErrorCode).name != 'DB_NO_ERROR':
                raise ConnectionError(f'{DDCPErrorCodes(ErrorCode).name}')
            DataType = data[2]
            if len(data) <= 3 + length:
                paramValue = ReadParamValueReply.parseParam(
                    data[3:3 + length], DataType)
                paramValues.append(paramValue)
            data = data[3 + length:]

        return ReadParamValueReply(paramValues)

    def parseParam(paramData, DataType):
        # Todo: implement parsing for all data types

        if DataType == DDCPDataTypes.DT_UNDEFINED:
            pass
        elif DataType == DDCPDataTypes.DT_BOOLEAN:
            return bool.from_bytes(bytes(paramData), byteorder="little")
        elif DataType == DDCPDataTypes.DT_INTEGER8:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_INTEGER16:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_INTEGER32:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_UNSIGNED8:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_UNSIGNED16:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_UNSIGNED32:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_FLOATINGPOINT:
            pass
        elif DataType == DDCPDataTypes.DT_VISIBLE_STRING:
            return str(bytes(paramData), encoding="utf-8")
        elif DataType == DDCPDataTypes.DT_BYTE_STRING:
            return bytes(paramData)
            pass
        elif DataType == DDCPDataTypes.DT_TIMEOFDAY:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE:
            pass
        elif DataType == DDCPDataTypes.DT_NORM_VALUE_N2:
            pass
        elif DataType == DDCPDataTypes.DT_NORM_VALUE_N4:
            pass
        elif DataType == DDCPDataTypes.DT_BIT_SEQUENCE:
            pass
        elif DataType == DDCPDataTypes.DT_DATE:
            pass
        elif DataType == DDCPDataTypes.DT_TIMEOFDAY_WITHOUT_DATE:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITH_DATE_INDICATION:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITHOUT_DATE_INDICATION:
            pass
        elif DataType == DDCPDataTypes.DT_BYTE:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_WORD:
            pass
        elif DataType == DDCPDataTypes.DT_DOUBLE_WORD:
            pass
        elif DataType == DDCPDataTypes.DT_ERROR:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)

        return paramData

Classes

class ReadParamValueReply (values: list)
Expand source code
class ReadParamValueReply:

    def __init__(self, values: list):
        self.Values = values

    def GetBytes(self):
        pass

    def GetMessageType(self):
        return 0x14

    def GetTypeHeader(self):
        pass

    @staticmethod
    def FromBytes(data):
        paramValues = []
        while len(data) > 3:
            length = data[0]
            ErrorCode = data[1]
            if DDCPErrorCodes(ErrorCode).name != 'DB_NO_ERROR':
                raise ConnectionError(f'{DDCPErrorCodes(ErrorCode).name}')
            DataType = data[2]
            if len(data) <= 3 + length:
                paramValue = ReadParamValueReply.parseParam(
                    data[3:3 + length], DataType)
                paramValues.append(paramValue)
            data = data[3 + length:]

        return ReadParamValueReply(paramValues)

    def parseParam(paramData, DataType):
        # Todo: implement parsing for all data types

        if DataType == DDCPDataTypes.DT_UNDEFINED:
            pass
        elif DataType == DDCPDataTypes.DT_BOOLEAN:
            return bool.from_bytes(bytes(paramData), byteorder="little")
        elif DataType == DDCPDataTypes.DT_INTEGER8:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_INTEGER16:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_INTEGER32:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
        elif DataType == DDCPDataTypes.DT_UNSIGNED8:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_UNSIGNED16:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_UNSIGNED32:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_FLOATINGPOINT:
            pass
        elif DataType == DDCPDataTypes.DT_VISIBLE_STRING:
            return str(bytes(paramData), encoding="utf-8")
        elif DataType == DDCPDataTypes.DT_BYTE_STRING:
            return bytes(paramData)
            pass
        elif DataType == DDCPDataTypes.DT_TIMEOFDAY:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE:
            pass
        elif DataType == DDCPDataTypes.DT_NORM_VALUE_N2:
            pass
        elif DataType == DDCPDataTypes.DT_NORM_VALUE_N4:
            pass
        elif DataType == DDCPDataTypes.DT_BIT_SEQUENCE:
            pass
        elif DataType == DDCPDataTypes.DT_DATE:
            pass
        elif DataType == DDCPDataTypes.DT_TIMEOFDAY_WITHOUT_DATE:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITH_DATE_INDICATION:
            pass
        elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITHOUT_DATE_INDICATION:
            pass
        elif DataType == DDCPDataTypes.DT_BYTE:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
        elif DataType == DDCPDataTypes.DT_WORD:
            pass
        elif DataType == DDCPDataTypes.DT_DOUBLE_WORD:
            pass
        elif DataType == DDCPDataTypes.DT_ERROR:
            return int.from_bytes(bytes(paramData), byteorder="little", signed=False)

        return paramData

Static methods

def FromBytes(data)
Expand source code
@staticmethod
def FromBytes(data):
    paramValues = []
    while len(data) > 3:
        length = data[0]
        ErrorCode = data[1]
        if DDCPErrorCodes(ErrorCode).name != 'DB_NO_ERROR':
            raise ConnectionError(f'{DDCPErrorCodes(ErrorCode).name}')
        DataType = data[2]
        if len(data) <= 3 + length:
            paramValue = ReadParamValueReply.parseParam(
                data[3:3 + length], DataType)
            paramValues.append(paramValue)
        data = data[3 + length:]

    return ReadParamValueReply(paramValues)

Methods

def GetBytes(self)
Expand source code
def GetBytes(self):
    pass
def GetMessageType(self)
Expand source code
def GetMessageType(self):
    return 0x14
def GetTypeHeader(self)
Expand source code
def GetTypeHeader(self):
    pass
def parseParam(paramData, DataType)
Expand source code
def parseParam(paramData, DataType):
    # Todo: implement parsing for all data types

    if DataType == DDCPDataTypes.DT_UNDEFINED:
        pass
    elif DataType == DDCPDataTypes.DT_BOOLEAN:
        return bool.from_bytes(bytes(paramData), byteorder="little")
    elif DataType == DDCPDataTypes.DT_INTEGER8:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
    elif DataType == DDCPDataTypes.DT_INTEGER16:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
    elif DataType == DDCPDataTypes.DT_INTEGER32:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=True)
    elif DataType == DDCPDataTypes.DT_UNSIGNED8:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
    elif DataType == DDCPDataTypes.DT_UNSIGNED16:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
    elif DataType == DDCPDataTypes.DT_UNSIGNED32:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
    elif DataType == DDCPDataTypes.DT_FLOATINGPOINT:
        pass
    elif DataType == DDCPDataTypes.DT_VISIBLE_STRING:
        return str(bytes(paramData), encoding="utf-8")
    elif DataType == DDCPDataTypes.DT_BYTE_STRING:
        return bytes(paramData)
        pass
    elif DataType == DDCPDataTypes.DT_TIMEOFDAY:
        pass
    elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE:
        pass
    elif DataType == DDCPDataTypes.DT_NORM_VALUE_N2:
        pass
    elif DataType == DDCPDataTypes.DT_NORM_VALUE_N4:
        pass
    elif DataType == DDCPDataTypes.DT_BIT_SEQUENCE:
        pass
    elif DataType == DDCPDataTypes.DT_DATE:
        pass
    elif DataType == DDCPDataTypes.DT_TIMEOFDAY_WITHOUT_DATE:
        pass
    elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITH_DATE_INDICATION:
        pass
    elif DataType == DDCPDataTypes.DT_TIME_DIFFERENCE_WITHOUT_DATE_INDICATION:
        pass
    elif DataType == DDCPDataTypes.DT_BYTE:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=False)
    elif DataType == DDCPDataTypes.DT_WORD:
        pass
    elif DataType == DDCPDataTypes.DT_DOUBLE_WORD:
        pass
    elif DataType == DDCPDataTypes.DT_ERROR:
        return int.from_bytes(bytes(paramData), byteorder="little", signed=False)

    return paramData