Module ddCommunication.protocols.LCPWirelessProtocol.WLCPWrapper.WLCPCommunication
Expand source code
from .SerialCommunication.SerialCommunication import SerialCommunication
from .WLCPTelegram import WLCPTelegram
from .WLCPPowerUpHandshake import OEM_LCP_ID
from .WLCPPowerUpHandshake import WLCPPowerUpHandshake
from .WLCPDDCPTelegram import WLCPDDCPTelegram
from .WLCPTelegramTypes import WLCPTelegramTypes
from axel import axel
from threading import *
from time import sleep
class WLCPCommunication:
def __init__(self, COMPort, baud=115200, WLCPTelegramParseSleep=0.5):
self.WLCPTelegramParseSleep = WLCPTelegramParseSleep
self.TelegramReceived = axel.Event()
self.ProbablyDisconnected = axel.Event()
self.dataReceivedBytes = b''
self.Baud = baud
self.COMPort = COMPort
self.__hasReceivedTelegram = True
self.__timerIsStopped = True
self.__firstPowerUpResponseGotten = None
self.__timerDisconnect = None
self.isRunning = False
self.dataReceivedBytesLock = None
self.WLCPTelegramParseThread = None
def __bytesReceived(self, *args, **kwargs):
if not self.__firstPowerUpResponseGotten:
self.__firstPowerUpResponseGotten = True
data = kwargs["Data"]
self.dataReceivedBytesLock.acquire()
self.dataReceivedBytes += data
self.dataReceivedBytesLock.release()
def __ParseWLCPTelegram(self):
dataReceived = []
while self.isRunning:
self.dataReceivedBytesLock.acquire()
for byte in self.dataReceivedBytes:
dataReceived.append(byte)
self.dataReceivedBytes = b''
self.dataReceivedBytesLock.release()
tryReadNextTel = True
while tryReadNextTel:
tryReadNextTel = False
if len(dataReceived) < 1:
sleep(0.2)
continue
while len(dataReceived) > 0 and dataReceived[0] != 0xE9:
dataReceived = dataReceived[1:]
if len(dataReceived) < 2:
sleep(0.2)
continue
telegramLength = dataReceived[1]
if telegramLength <= len(dataReceived):
tryReadNextTel = True
self.__hasReceivedTelegram = True
telegram = dataReceived[0:telegramLength]
requestID = dataReceived[0]
telegramTypeByte = dataReceived[3]
data = dataReceived[4:telegramLength]
self.TelegramReceived(
Length=telegramLength,
RequestID=requestID,
Type=telegramTypeByte,
Data=data)
dataReceived = dataReceived[telegramLength:]
sleep(self.WLCPTelegramParseSleep)
def __startCheckDisconnectTimer(self):
def timerCallback():
if not self.__hasReceivedTelegram and self.isRunning:
self.ProbablyDisconnected()
return
self.__hasReceivedTelegram = False
if not self.__timerIsStopped:
self.__timerDisconnect = Timer(5, timerCallback)
self.__timerDisconnect.start()
self.__timerIsStopped = False
self.__timerDisconnect = Timer(5, timerCallback)
self.__timerDisconnect.start()
def __stopCheckDisconnectTimer(self):
self.__timerIsStopped = True
self.__timerDisconnect.cancel()
def Start(self):
"""
Starts the connection to the drive by sending powerup packages until a response has been sent back.
"""
self.serial = SerialCommunication(self.COMPort, self.Baud)
self.serial.BeginListening()
self.serial.DataReceived += self.__bytesReceived
self.__firstPowerUpResponseGotten = False
self.isRunning = True
self.dataReceivedBytesLock = Lock()
self.dataReceivedBytes = b''
self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram)
self.WLCPTelegramParseThread.daemon = True
self.WLCPTelegramParseThread.start()
tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027)
telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)
max_count = 0
while not self.__firstPowerUpResponseGotten and max_count < 25:
self.serial.AddToQueue(telegram.GetBytes())
sleep(1.5)
max_count += 1
if not self.__firstPowerUpResponseGotten:
raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?')
self.serial.ClearQueue()
sleep(0.3)
tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027)
telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel)
self.serial.AddToQueue(telegram.GetBytes())
self.serial.SetQueueBaseItem(WLCPTelegram(
0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes())
self.serial.Started = True
self.__startCheckDisconnectTimer()
def Stop(self):
"""
Stops the connection to the drive
"""
self.isRunning = False
self.serial.StopListening()
self.serial.DataReceived -= self.__bytesReceived
self.__firstPowerUpResponseGotten = False
self.dataReceivedBytesLock.acquire()
self.dataReceivedBytes = b''
self.dataReceivedBytesLock.release()
self.__stopCheckDisconnectTimer()
while self.WLCPTelegramParseThread.is_alive():
sleep(0.2)
def SendWLCPTelegram(self, telegram: WLCPTelegram):
"""
Sends a single wlcp telegram
telegram should be an instance of WLCPTelegram
"""
byt = telegram.GetBytes()
self.serial.AddToQueue(telegram.GetBytes())
def SendDDCPTelegram(self, telegram):
"""
Takes a DDCP telegram and wraps it in a WLCP telegram
(multiple WLCP telegrams if DDCP telegram is larger than 256 bytes)
request id should be the same as the ddcp id.
Should be an instance of DDCPTelegram
"""
b = telegram.GetBytes()
if len(b) < 256:
tel = WLCPDDCPTelegram(b)
self.SendWLCPTelegram(WLCPTelegram(
telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
else:
for i in range(0, len(b), 250):
tel = WLCPDDCPTelegram(b[i:i + 250])
# TODO!: test that multiple packets work
self.SendWLCPTelegram(WLCPTelegram(
telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
Classes
class WLCPCommunication (COMPort, baud=115200, WLCPTelegramParseSleep=0.5)
-
Expand source code
class WLCPCommunication: def __init__(self, COMPort, baud=115200, WLCPTelegramParseSleep=0.5): self.WLCPTelegramParseSleep = WLCPTelegramParseSleep self.TelegramReceived = axel.Event() self.ProbablyDisconnected = axel.Event() self.dataReceivedBytes = b'' self.Baud = baud self.COMPort = COMPort self.__hasReceivedTelegram = True self.__timerIsStopped = True self.__firstPowerUpResponseGotten = None self.__timerDisconnect = None self.isRunning = False self.dataReceivedBytesLock = None self.WLCPTelegramParseThread = None def __bytesReceived(self, *args, **kwargs): if not self.__firstPowerUpResponseGotten: self.__firstPowerUpResponseGotten = True data = kwargs["Data"] self.dataReceivedBytesLock.acquire() self.dataReceivedBytes += data self.dataReceivedBytesLock.release() def __ParseWLCPTelegram(self): dataReceived = [] while self.isRunning: self.dataReceivedBytesLock.acquire() for byte in self.dataReceivedBytes: dataReceived.append(byte) self.dataReceivedBytes = b'' self.dataReceivedBytesLock.release() tryReadNextTel = True while tryReadNextTel: tryReadNextTel = False if len(dataReceived) < 1: sleep(0.2) continue while len(dataReceived) > 0 and dataReceived[0] != 0xE9: dataReceived = dataReceived[1:] if len(dataReceived) < 2: sleep(0.2) continue telegramLength = dataReceived[1] if telegramLength <= len(dataReceived): tryReadNextTel = True self.__hasReceivedTelegram = True telegram = dataReceived[0:telegramLength] requestID = dataReceived[0] telegramTypeByte = dataReceived[3] data = dataReceived[4:telegramLength] self.TelegramReceived( Length=telegramLength, RequestID=requestID, Type=telegramTypeByte, Data=data) dataReceived = dataReceived[telegramLength:] sleep(self.WLCPTelegramParseSleep) def __startCheckDisconnectTimer(self): def timerCallback(): if not self.__hasReceivedTelegram and self.isRunning: self.ProbablyDisconnected() return self.__hasReceivedTelegram = False if not self.__timerIsStopped: self.__timerDisconnect = Timer(5, timerCallback) self.__timerDisconnect.start() self.__timerIsStopped = False self.__timerDisconnect = Timer(5, timerCallback) self.__timerDisconnect.start() def __stopCheckDisconnectTimer(self): self.__timerIsStopped = True self.__timerDisconnect.cancel() def Start(self): """ Starts the connection to the drive by sending powerup packages until a response has been sent back. """ self.serial = SerialCommunication(self.COMPort, self.Baud) self.serial.BeginListening() self.serial.DataReceived += self.__bytesReceived self.__firstPowerUpResponseGotten = False self.isRunning = True self.dataReceivedBytesLock = Lock() self.dataReceivedBytes = b'' self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram) self.WLCPTelegramParseThread.daemon = True self.WLCPTelegramParseThread.start() tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027) telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel) max_count = 0 while not self.__firstPowerUpResponseGotten and max_count < 25: self.serial.AddToQueue(telegram.GetBytes()) sleep(1.5) max_count += 1 if not self.__firstPowerUpResponseGotten: raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?') self.serial.ClearQueue() sleep(0.3) tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027) telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel) self.serial.AddToQueue(telegram.GetBytes()) self.serial.SetQueueBaseItem(WLCPTelegram( 0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes()) self.serial.Started = True self.__startCheckDisconnectTimer() def Stop(self): """ Stops the connection to the drive """ self.isRunning = False self.serial.StopListening() self.serial.DataReceived -= self.__bytesReceived self.__firstPowerUpResponseGotten = False self.dataReceivedBytesLock.acquire() self.dataReceivedBytes = b'' self.dataReceivedBytesLock.release() self.__stopCheckDisconnectTimer() while self.WLCPTelegramParseThread.is_alive(): sleep(0.2) def SendWLCPTelegram(self, telegram: WLCPTelegram): """ Sends a single wlcp telegram telegram should be an instance of WLCPTelegram """ byt = telegram.GetBytes() self.serial.AddToQueue(telegram.GetBytes()) def SendDDCPTelegram(self, telegram): """ Takes a DDCP telegram and wraps it in a WLCP telegram (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes) request id should be the same as the ddcp id. Should be an instance of DDCPTelegram """ b = telegram.GetBytes() if len(b) < 256: tel = WLCPDDCPTelegram(b) self.SendWLCPTelegram(WLCPTelegram( telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel)) else: for i in range(0, len(b), 250): tel = WLCPDDCPTelegram(b[i:i + 250]) # TODO!: test that multiple packets work self.SendWLCPTelegram(WLCPTelegram( telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
Methods
def SendDDCPTelegram(self, telegram)
-
Takes a DDCP telegram and wraps it in a WLCP telegram (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes) request id should be the same as the ddcp id. Should be an instance of DDCPTelegram
Expand source code
def SendDDCPTelegram(self, telegram): """ Takes a DDCP telegram and wraps it in a WLCP telegram (multiple WLCP telegrams if DDCP telegram is larger than 256 bytes) request id should be the same as the ddcp id. Should be an instance of DDCPTelegram """ b = telegram.GetBytes() if len(b) < 256: tel = WLCPDDCPTelegram(b) self.SendWLCPTelegram(WLCPTelegram( telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel)) else: for i in range(0, len(b), 250): tel = WLCPDDCPTelegram(b[i:i + 250]) # TODO!: test that multiple packets work self.SendWLCPTelegram(WLCPTelegram( telegram.RequestID, WLCPTelegramTypes.DDCPTelegram, tel))
def SendWLCPTelegram(self, telegram: WLCPTelegram)
-
Sends a single wlcp telegram telegram should be an instance of WLCPTelegram
Expand source code
def SendWLCPTelegram(self, telegram: WLCPTelegram): """ Sends a single wlcp telegram telegram should be an instance of WLCPTelegram """ byt = telegram.GetBytes() self.serial.AddToQueue(telegram.GetBytes())
def Start(self)
-
Starts the connection to the drive by sending powerup packages until a response has been sent back.
Expand source code
def Start(self): """ Starts the connection to the drive by sending powerup packages until a response has been sent back. """ self.serial = SerialCommunication(self.COMPort, self.Baud) self.serial.BeginListening() self.serial.DataReceived += self.__bytesReceived self.__firstPowerUpResponseGotten = False self.isRunning = True self.dataReceivedBytesLock = Lock() self.dataReceivedBytes = b'' self.WLCPTelegramParseThread = Thread(target=self.__ParseWLCPTelegram) self.WLCPTelegramParseThread.daemon = True self.WLCPTelegramParseThread.start() tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_TEK, 2.1027) telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel) max_count = 0 while not self.__firstPowerUpResponseGotten and max_count < 25: self.serial.AddToQueue(telegram.GetBytes()) sleep(1.5) max_count += 1 if not self.__firstPowerUpResponseGotten: raise Exception('Failed to get first power up response. Is drive software supporting DDCP communication?') self.serial.ClearQueue() sleep(0.3) tel = WLCPPowerUpHandshake(OEM_LCP_ID.LCP_BASIC_INTELLIFLO, 2.1027) telegram = WLCPTelegram(0xff, WLCPTelegramTypes.PowerUpTelegram, tel) self.serial.AddToQueue(telegram.GetBytes()) self.serial.SetQueueBaseItem(WLCPTelegram( 0xff, WLCPTelegramTypes.KeepAliveTelegram, None).GetBytes()) self.serial.Started = True self.__startCheckDisconnectTimer()
def Stop(self)
-
Stops the connection to the drive
Expand source code
def Stop(self): """ Stops the connection to the drive """ self.isRunning = False self.serial.StopListening() self.serial.DataReceived -= self.__bytesReceived self.__firstPowerUpResponseGotten = False self.dataReceivedBytesLock.acquire() self.dataReceivedBytes = b'' self.dataReceivedBytesLock.release() self.__stopCheckDisconnectTimer() while self.WLCPTelegramParseThread.is_alive(): sleep(0.2)