Module ddCommunication.protocols.LCPWirelessProtocol.WLCPWrapper.WLCPCommunication

Expand source code
from .SerialCommunication.SerialCommunication import SerialCommunication
from .WLCPTelegram import WLCPTelegram
from .WLCPPowerUpHandshake import OEM_LCP_ID
from .WLCPPowerUpHandshake import WLCPPowerUpHandshake
from .WLCPDDCPTelegram import WLCPDDCPTelegram
from .WLCPTelegramTypes import WLCPTelegramTypes
from axel import axel
from threading import *
from time import sleep


class WLCPCommunication:
    def __init__(self, COMPort, baud=115200, WLCPTelegramParseSleep=0.5):
        self.WLCPTelegramParseSleep = WLCPTelegramParseSleep
        self.TelegramReceived = axel.Event()
        self.ProbablyDisconnected = axel.Event()
        self.dataReceivedBytes = b''
        self.Baud = baud
        self.COMPort = COMPort
        self.__hasReceivedTelegram = True
        self.__timerIsStopped = True
        self.__firstPowerUpResponseGotten = None
        self.__timerDisconnect = None
        self.isRunning = False
        self.dataReceivedBytesLock = None
        self.WLCPTelegramParseThread = None
    def __bytesReceived(self, *args, **kwargs):
        if not self.__firstPowerUpResponseGotten:
            self.__firstPowerUpResponseGotten = True
        data = kwargs["Data"]
        self.dataReceivedBytesLock.acquire()
        self.dataReceivedBytes += data
        self.dataReceivedBytesLock.release()

    def __ParseWLCPTelegram(self):
        dataReceived = []
        while self.isRunning:
            self.dataReceivedBytesLock.acquire()
            for byte in self.dataReceivedBytes:
                dataReceived.append(byte)
            self.dataReceivedBytes = b''
            self.dataReceivedBytesLock.release()

            tryReadNextTel = True
            while tryReadNextTel:
                tryReadNextTel = False

                if len(dataReceived) < 1:
                    sleep(0.2)
                    continue

                while len(dataReceived) > 0 and dataReceived[0] != 0xE9:
                    dataReceived = dataReceived[1:]

                if len(dataReceived) < 2:
                    sleep(0.2)
                    continue

                telegramLength = dataReceived[1]
                if telegramLength <= len(dataReceived):
                    tryReadNextTel = True
                    self.__hasReceivedTelegram = True
                    telegram = dataReceived[0:telegramLength]
                    requestID = dataReceived[0]
                    telegramTypeByte = dataReceived[3]
                    data = dataReceived[4:telegramLength]
                    self.TelegramReceived(
                        Length=telegramLength,
                        RequestID=requestID,
                        Type=telegramTypeByte,
                        Data=data)
                    dataReceived = dataReceived[telegramLength:]

            sleep(self.WLCPTelegramParseSleep)

    def __startCheckDisconnectTimer(self):
        def timerCallback():
            if not self.__hasReceivedTelegram and self.isRunning:
                self.ProbablyDisconnected()
                return

            self.__hasReceivedTelegram = False
            if not self.__timerIsStopped:
                self.__timerDisconnect = Timer(5, timerCallback)
                self.__timerDisconnect.start()

        self.__timerIsStopped = False
        self.__timerDisconnect = Timer(5, timerCallback)
        self.__timerDisconnect.start()

    def __stopCheckDisconnectTimer(self):
        self.__timerIsStopped = True
        self.__timerDisconnect.cancel()

    def Start(self):
        """
        Starts the connection to the drive by sending powerup packages until a response has been sent back.
        """
        self.serial = SerialCommunication(self.COMPort, self.Baud)
        self.serial.BeginListening()
        self.serial.DataReceived += self.__bytesReceived
        self.__firstPowerUpResponseGotten = False
        self.isRunning = True
        self.dataReceivedBytesLock = Lock()
        self.dataReceivedBytes = b''
        self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram)
        self.WLCPTelegramParseThread.daemon = True
        self.WLCPTelegramParseThread.start()

        tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027)
        telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)
        max_count = 0
        while not self.__firstPowerUpResponseGotten and max_count < 25:
            self.serial.AddToQueue(telegram.GetBytes())
            sleep(1.5)
            max_count += 1
        
        if not self.__firstPowerUpResponseGotten:
            raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?')

        self.serial.ClearQueue()
        sleep(0.3)
        tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027)
        telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)

        self.serial.AddToQueue(telegram.GetBytes())
        self.serial.SetQueueBaseItem(WLCPTelegram(
            0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes())
        self.serial.Started = True

        self.__startCheckDisconnectTimer()

    def Stop(self):
        """
        Stops the connection to the drive
        """
        self.isRunning = False
        self.serial.StopListening()
        self.serial.DataReceived -= self.__bytesReceived
        self.__firstPowerUpResponseGotten = False
        self.dataReceivedBytesLock.acquire()
        self.dataReceivedBytes = b''
        self.dataReceivedBytesLock.release()
        self.__stopCheckDisconnectTimer()
        while self.WLCPTelegramParseThread.is_alive():
            sleep(0.2)

    def SendWLCPTelegram(self, telegram: WLCPTelegram):
        """
        Sends a single wlcp telegram
        telegram should be an instance of WLCPTelegram
        """
        byt = telegram.GetBytes()
        self.serial.AddToQueue(telegram.GetBytes())

    def SendDDCPTelegram(self, telegram):
        """
        Takes a DDCP telegram and wraps it in a WLCP telegram
        (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes)
        request id should be the same as the ddcp id.
        Should be an instance of DDCPTelegram
        """
        b = telegram.GetBytes()
        if len(b) < 256:
            tel = WLCPDDCPTelegram(b)
            self.SendWLCPTelegram(WLCPTelegram(
                telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
        else:
            for i in range(0, len(b), 250):
                tel = WLCPDDCPTelegram(b[i:i + 250])
                # TODO!: test that multiple packets work
                self.SendWLCPTelegram(WLCPTelegram(
                    telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))

Classes

class WLCPCommunication (COMPort, baud=115200, WLCPTelegramParseSleep=0.5)
Expand source code
class WLCPCommunication:
    def __init__(self, COMPort, baud=115200, WLCPTelegramParseSleep=0.5):
        self.WLCPTelegramParseSleep = WLCPTelegramParseSleep
        self.TelegramReceived = axel.Event()
        self.ProbablyDisconnected = axel.Event()
        self.dataReceivedBytes = b''
        self.Baud = baud
        self.COMPort = COMPort
        self.__hasReceivedTelegram = True
        self.__timerIsStopped = True
        self.__firstPowerUpResponseGotten = None
        self.__timerDisconnect = None
        self.isRunning = False
        self.dataReceivedBytesLock = None
        self.WLCPTelegramParseThread = None
    def __bytesReceived(self, *args, **kwargs):
        if not self.__firstPowerUpResponseGotten:
            self.__firstPowerUpResponseGotten = True
        data = kwargs["Data"]
        self.dataReceivedBytesLock.acquire()
        self.dataReceivedBytes += data
        self.dataReceivedBytesLock.release()

    def __ParseWLCPTelegram(self):
        dataReceived = []
        while self.isRunning:
            self.dataReceivedBytesLock.acquire()
            for byte in self.dataReceivedBytes:
                dataReceived.append(byte)
            self.dataReceivedBytes = b''
            self.dataReceivedBytesLock.release()

            tryReadNextTel = True
            while tryReadNextTel:
                tryReadNextTel = False

                if len(dataReceived) < 1:
                    sleep(0.2)
                    continue

                while len(dataReceived) > 0 and dataReceived[0] != 0xE9:
                    dataReceived = dataReceived[1:]

                if len(dataReceived) < 2:
                    sleep(0.2)
                    continue

                telegramLength = dataReceived[1]
                if telegramLength <= len(dataReceived):
                    tryReadNextTel = True
                    self.__hasReceivedTelegram = True
                    telegram = dataReceived[0:telegramLength]
                    requestID = dataReceived[0]
                    telegramTypeByte = dataReceived[3]
                    data = dataReceived[4:telegramLength]
                    self.TelegramReceived(
                        Length=telegramLength,
                        RequestID=requestID,
                        Type=telegramTypeByte,
                        Data=data)
                    dataReceived = dataReceived[telegramLength:]

            sleep(self.WLCPTelegramParseSleep)

    def __startCheckDisconnectTimer(self):
        def timerCallback():
            if not self.__hasReceivedTelegram and self.isRunning:
                self.ProbablyDisconnected()
                return

            self.__hasReceivedTelegram = False
            if not self.__timerIsStopped:
                self.__timerDisconnect = Timer(5, timerCallback)
                self.__timerDisconnect.start()

        self.__timerIsStopped = False
        self.__timerDisconnect = Timer(5, timerCallback)
        self.__timerDisconnect.start()

    def __stopCheckDisconnectTimer(self):
        self.__timerIsStopped = True
        self.__timerDisconnect.cancel()

    def Start(self):
        """
        Starts the connection to the drive by sending powerup packages until a response has been sent back.
        """
        self.serial = SerialCommunication(self.COMPort, self.Baud)
        self.serial.BeginListening()
        self.serial.DataReceived += self.__bytesReceived
        self.__firstPowerUpResponseGotten = False
        self.isRunning = True
        self.dataReceivedBytesLock = Lock()
        self.dataReceivedBytes = b''
        self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram)
        self.WLCPTelegramParseThread.daemon = True
        self.WLCPTelegramParseThread.start()

        tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027)
        telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)
        max_count = 0
        while not self.__firstPowerUpResponseGotten and max_count < 25:
            self.serial.AddToQueue(telegram.GetBytes())
            sleep(1.5)
            max_count += 1
        
        if not self.__firstPowerUpResponseGotten:
            raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?')

        self.serial.ClearQueue()
        sleep(0.3)
        tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027)
        telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)

        self.serial.AddToQueue(telegram.GetBytes())
        self.serial.SetQueueBaseItem(WLCPTelegram(
            0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes())
        self.serial.Started = True

        self.__startCheckDisconnectTimer()

    def Stop(self):
        """
        Stops the connection to the drive
        """
        self.isRunning = False
        self.serial.StopListening()
        self.serial.DataReceived -= self.__bytesReceived
        self.__firstPowerUpResponseGotten = False
        self.dataReceivedBytesLock.acquire()
        self.dataReceivedBytes = b''
        self.dataReceivedBytesLock.release()
        self.__stopCheckDisconnectTimer()
        while self.WLCPTelegramParseThread.is_alive():
            sleep(0.2)

    def SendWLCPTelegram(self, telegram: WLCPTelegram):
        """
        Sends a single wlcp telegram
        telegram should be an instance of WLCPTelegram
        """
        byt = telegram.GetBytes()
        self.serial.AddToQueue(telegram.GetBytes())

    def SendDDCPTelegram(self, telegram):
        """
        Takes a DDCP telegram and wraps it in a WLCP telegram
        (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes)
        request id should be the same as the ddcp id.
        Should be an instance of DDCPTelegram
        """
        b = telegram.GetBytes()
        if len(b) < 256:
            tel = WLCPDDCPTelegram(b)
            self.SendWLCPTelegram(WLCPTelegram(
                telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
        else:
            for i in range(0, len(b), 250):
                tel = WLCPDDCPTelegram(b[i:i + 250])
                # TODO!: test that multiple packets work
                self.SendWLCPTelegram(WLCPTelegram(
                    telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))

Methods

def SendDDCPTelegram(self, telegram)

Takes a DDCP telegram and wraps it in a WLCP telegram (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes) request id should be the same as the ddcp id. Should be an instance of DDCPTelegram

Expand source code
def SendDDCPTelegram(self, telegram):
    """
    Takes a DDCP telegram and wraps it in a WLCP telegram
    (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes)
    request id should be the same as the ddcp id.
    Should be an instance of DDCPTelegram
    """
    b = telegram.GetBytes()
    if len(b) < 256:
        tel = WLCPDDCPTelegram(b)
        self.SendWLCPTelegram(WLCPTelegram(
            telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
    else:
        for i in range(0, len(b), 250):
            tel = WLCPDDCPTelegram(b[i:i + 250])
            # TODO!: test that multiple packets work
            self.SendWLCPTelegram(WLCPTelegram(
                telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
def SendWLCPTelegram(self, telegram: WLCPTelegram)

Sends a single wlcp telegram telegram should be an instance of WLCPTelegram

Expand source code
def SendWLCPTelegram(self, telegram: WLCPTelegram):
    """
    Sends a single wlcp telegram
    telegram should be an instance of WLCPTelegram
    """
    byt = telegram.GetBytes()
    self.serial.AddToQueue(telegram.GetBytes())
def Start(self)

Starts the connection to the drive by sending powerup packages until a response has been sent back.

Expand source code
def Start(self):
    """
    Starts the connection to the drive by sending powerup packages until a response has been sent back.
    """
    self.serial = SerialCommunication(self.COMPort, self.Baud)
    self.serial.BeginListening()
    self.serial.DataReceived += self.__bytesReceived
    self.__firstPowerUpResponseGotten = False
    self.isRunning = True
    self.dataReceivedBytesLock = Lock()
    self.dataReceivedBytes = b''
    self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram)
    self.WLCPTelegramParseThread.daemon = True
    self.WLCPTelegramParseThread.start()

    tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027)
    telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)
    max_count = 0
    while not self.__firstPowerUpResponseGotten and max_count < 25:
        self.serial.AddToQueue(telegram.GetBytes())
        sleep(1.5)
        max_count += 1
    
    if not self.__firstPowerUpResponseGotten:
        raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?')

    self.serial.ClearQueue()
    sleep(0.3)
    tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027)
    telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)

    self.serial.AddToQueue(telegram.GetBytes())
    self.serial.SetQueueBaseItem(WLCPTelegram(
        0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes())
    self.serial.Started = True

    self.__startCheckDisconnectTimer()
def Stop(self)

Stops the connection to the drive

Expand source code
def Stop(self):
    """
    Stops the connection to the drive
    """
    self.isRunning = False
    self.serial.StopListening()
    self.serial.DataReceived -= self.__bytesReceived
    self.__firstPowerUpResponseGotten = False
    self.dataReceivedBytesLock.acquire()
    self.dataReceivedBytes = b''
    self.dataReceivedBytesLock.release()
    self.__stopCheckDisconnectTimer()
    while self.WLCPTelegramParseThread.is_alive():
        sleep(0.2)