Module ddCommunication.protocols.LCPWirelessProtocol.DDCPCommunicator
Expand source code
from axel import axel
from threading import *
from time import sleep
from .WLCPWrapper.WLCPCommunication import WLCPCommunication
from .WLCPWrapper.WLCPTelegramTypes import WLCPTelegramTypes
from .DDCPTelegram import DDCPTelegram
from .HelperMethods import calculate_crc_arr
import time
class DDCPCommunicator:
def __init__(self, COMPort, DDCPTelegramParseSleep=0.5, ReconnectAutomatically=True, maxTimeBeforePacketIsResent=10):
self.DDCPTelegramReceived = axel.Event()
self.CorruptedDDCPTelegramReceived = axel.Event()
self.wlcp = WLCPCommunication(
COMPort, WLCPTelegramParseSleep=DDCPTelegramParseSleep)
self.wlcp.TelegramReceived += self.__telegramReceived
self.telReceivedData = []
self.sendTelegramCallbacks = {}
self.sendTelegramReconnect = {}
self.ReconnectAutomatically = ReconnectAutomatically
self.IsRestarting = False
self.IsStopped = True
self.receivedTelRecently = True
self.MaxTimeBeforePacketIsResent = maxTimeBeforePacketIsResent
self.CheckDisconnectTimer = None
def __isDisconnected(self):
if not self.receivedTelRecently and len(self.sendTelegramReconnect) > 0:
self.__probablyDisconnected()
now = time.time()
# Resend telegrams that are older than self.MaxTimeBeforePacketIsResent (seconds)
telegramsCopy = {**self.sendTelegramReconnect}
for key, val in telegramsCopy.items():
if self.MaxTimeBeforePacketIsResent == -1:
continue
if (self.MaxTimeBeforePacketIsResent*2) < (now - val['Time']):
self.__probablyDisconnected()
break
if self.MaxTimeBeforePacketIsResent < (now - val['Time']):
self.SendDDCPTelegram(val['Telegram'], val['Callback'])
self.sendTelegramReconnect[key]['Time'] = val['Time']
print("Resent telegram due to more than " +
str(self.MaxTimeBeforePacketIsResent) + " seconds has elapsed")
self.receivedTelRecently = False
if not self.IsStopped:
self.CheckDisconnectTimer = Timer(10, self.__isDisconnected)
self.CheckDisconnectTimer.start()
def __probablyDisconnectedWLCP(self, *args, **kwargs):
self.__probablyDisconnected()
def __probablyDisconnected(self):
if self.ReconnectAutomatically and not self.IsRestarting:
self.IsRestarting = True
self.Stop()
self.Start()
reconnectCopy = {**self.sendTelegramReconnect}
self.sendTelegramReconnect = {}
self.IsRestarting = False
for k, v in reconnectCopy.items():
sleep(0.3)
self.SendDDCPTelegram(v['Telegram'], v['Callback'])
def __telegramReceived(self, *args, **kwargs):
wlcplength = kwargs["Length"]
requestID = kwargs["RequestID"]
wlcptelType = kwargs["Type"]
data = kwargs["Data"]
if WLCPTelegramTypes.DDCPTelegram == wlcptelType:
for d in data:
self.telReceivedData.append(d)
self.__parseDDCPTelegram()
def __parseDDCPTelegram(self):
if len(self.telReceivedData) < 3:
return
telegramLength = int.from_bytes(
self.telReceivedData[1:3], byteorder="little")
if telegramLength > len(self.telReceivedData):
return
telegram = self.telReceivedData[0:telegramLength]
self.telReceivedData = self.telReceivedData[telegramLength:]
requestID = telegram[0]
# telegramLength = int.from_bytes(self.telReceivedData[1:3],byteorder="little")
messageType = telegram[3]
typeHeader = telegram[4]
data = telegram[5:len(telegram) - 1]
crc = telegram[len(telegram) - 1]
calcCRC = calculate_crc_arr(data)
if calcCRC == crc:
if requestID in self.sendTelegramCallbacks:
request = self.sendTelegramCallbacks[requestID]
self.sendTelegramCallbacks.pop(requestID)
request(RequestID=requestID, Length=telegramLength,
MessageType=messageType, TypeHeader=typeHeader, Data=data, CRC=crc)
self.sendTelegramReconnect.pop(requestID)
self.receivedTelRecently = True
self.DDCPTelegramReceived(RequestID=requestID, Length=telegramLength,
MessageType=messageType, TypeHeader=typeHeader, Data=data, CRC=crc)
def SendDDCPTelegram(self, Telegram: DDCPTelegram, callback=None):
"""
Sends a single DDCP telegram
The callback function will be called when the drive has sent a telegram back with the same request id
"""
now = time.time()
if callback != None:
self.sendTelegramCallbacks[Telegram.RequestID] = callback
self.sendTelegramReconnect[Telegram.RequestID] = {
'Telegram': Telegram, 'Callback': callback, 'Time': now}
else:
self.sendTelegramReconnect[Telegram.RequestID] = {
'Telegram': Telegram, 'Callback': None, 'Time': now}
if not self.IsRestarting:
self.wlcp.SendDDCPTelegram(Telegram)
def Start(self):
"""
Starts the connection to the drive by sending powerup packages until a response has been sent back.
"""
self.IsStopped = False
self.wlcp.Start()
self.wlcp.ProbablyDisconnected += self.__probablyDisconnectedWLCP
self.CheckDisconnectTimer = Timer(10, self.__isDisconnected)
self.CheckDisconnectTimer.start()
def Stop(self):
self.IsStopped = True
self.CheckDisconnectTimer.cancel()
self.wlcp.Stop()
self.wlcp.ProbablyDisconnected -= self.__probablyDisconnectedWLCP
Classes
class DDCPCommunicator (COMPort, DDCPTelegramParseSleep=0.5, ReconnectAutomatically=True, maxTimeBeforePacketIsResent=10)
-
Expand source code
class DDCPCommunicator: def __init__(self, COMPort, DDCPTelegramParseSleep=0.5, ReconnectAutomatically=True, maxTimeBeforePacketIsResent=10): self.DDCPTelegramReceived = axel.Event() self.CorruptedDDCPTelegramReceived = axel.Event() self.wlcp = WLCPCommunication( COMPort, WLCPTelegramParseSleep=DDCPTelegramParseSleep) self.wlcp.TelegramReceived += self.__telegramReceived self.telReceivedData = [] self.sendTelegramCallbacks = {} self.sendTelegramReconnect = {} self.ReconnectAutomatically = ReconnectAutomatically self.IsRestarting = False self.IsStopped = True self.receivedTelRecently = True self.MaxTimeBeforePacketIsResent = maxTimeBeforePacketIsResent self.CheckDisconnectTimer = None def __isDisconnected(self): if not self.receivedTelRecently and len(self.sendTelegramReconnect) > 0: self.__probablyDisconnected() now = time.time() # Resend telegrams that are older than self.MaxTimeBeforePacketIsResent (seconds) telegramsCopy = {**self.sendTelegramReconnect} for key, val in telegramsCopy.items(): if self.MaxTimeBeforePacketIsResent == -1: continue if (self.MaxTimeBeforePacketIsResent*2) < (now - val['Time']): self.__probablyDisconnected() break if self.MaxTimeBeforePacketIsResent < (now - val['Time']): self.SendDDCPTelegram(val['Telegram'], val['Callback']) self.sendTelegramReconnect[key]['Time'] = val['Time'] print("Resent telegram due to more than " + str(self.MaxTimeBeforePacketIsResent) + " seconds has elapsed") self.receivedTelRecently = False if not self.IsStopped: self.CheckDisconnectTimer = Timer(10, self.__isDisconnected) self.CheckDisconnectTimer.start() def __probablyDisconnectedWLCP(self, *args, **kwargs): self.__probablyDisconnected() def __probablyDisconnected(self): if self.ReconnectAutomatically and not self.IsRestarting: self.IsRestarting = True self.Stop() self.Start() reconnectCopy = {**self.sendTelegramReconnect} self.sendTelegramReconnect = {} self.IsRestarting = False for k, v in reconnectCopy.items(): sleep(0.3) self.SendDDCPTelegram(v['Telegram'], v['Callback']) def __telegramReceived(self, *args, **kwargs): wlcplength = kwargs["Length"] requestID = kwargs["RequestID"] wlcptelType = kwargs["Type"] data = kwargs["Data"] if WLCPTelegramTypes.DDCPTelegram == wlcptelType: for d in data: self.telReceivedData.append(d) self.__parseDDCPTelegram() def __parseDDCPTelegram(self): if len(self.telReceivedData) < 3: return telegramLength = int.from_bytes( self.telReceivedData[1:3], byteorder="little") if telegramLength > len(self.telReceivedData): return telegram = self.telReceivedData[0:telegramLength] self.telReceivedData = self.telReceivedData[telegramLength:] requestID = telegram[0] # telegramLength = int.from_bytes(self.telReceivedData[1:3],byteorder="little") messageType = telegram[3] typeHeader = telegram[4] data = telegram[5:len(telegram) - 1] crc = telegram[len(telegram) - 1] calcCRC = calculate_crc_arr(data) if calcCRC == crc: if requestID in self.sendTelegramCallbacks: request = self.sendTelegramCallbacks[requestID] self.sendTelegramCallbacks.pop(requestID) request(RequestID=requestID, Length=telegramLength, MessageType=messageType, TypeHeader=typeHeader, Data=data, CRC=crc) self.sendTelegramReconnect.pop(requestID) self.receivedTelRecently = True self.DDCPTelegramReceived(RequestID=requestID, Length=telegramLength, MessageType=messageType, TypeHeader=typeHeader, Data=data, CRC=crc) def SendDDCPTelegram(self, Telegram: DDCPTelegram, callback=None): """ Sends a single DDCP telegram The callback function will be called when the drive has sent a telegram back with the same request id """ now = time.time() if callback != None: self.sendTelegramCallbacks[Telegram.RequestID] = callback self.sendTelegramReconnect[Telegram.RequestID] = { 'Telegram': Telegram, 'Callback': callback, 'Time': now} else: self.sendTelegramReconnect[Telegram.RequestID] = { 'Telegram': Telegram, 'Callback': None, 'Time': now} if not self.IsRestarting: self.wlcp.SendDDCPTelegram(Telegram) def Start(self): """ Starts the connection to the drive by sending powerup packages until a response has been sent back. """ self.IsStopped = False self.wlcp.Start() self.wlcp.ProbablyDisconnected += self.__probablyDisconnectedWLCP self.CheckDisconnectTimer = Timer(10, self.__isDisconnected) self.CheckDisconnectTimer.start() def Stop(self): self.IsStopped = True self.CheckDisconnectTimer.cancel() self.wlcp.Stop() self.wlcp.ProbablyDisconnected -= self.__probablyDisconnectedWLCP
Methods
def SendDDCPTelegram(self, Telegram: DDCPTelegram, callback=None)
-
Sends a single DDCP telegram The callback function will be called when the drive has sent a telegram back with the same request id
Expand source code
def SendDDCPTelegram(self, Telegram: DDCPTelegram, callback=None): """ Sends a single DDCP telegram The callback function will be called when the drive has sent a telegram back with the same request id """ now = time.time() if callback != None: self.sendTelegramCallbacks[Telegram.RequestID] = callback self.sendTelegramReconnect[Telegram.RequestID] = { 'Telegram': Telegram, 'Callback': callback, 'Time': now} else: self.sendTelegramReconnect[Telegram.RequestID] = { 'Telegram': Telegram, 'Callback': None, 'Time': now} if not self.IsRestarting: self.wlcp.SendDDCPTelegram(Telegram)
def Start(self)
-
Starts the connection to the drive by sending powerup packages until a response has been sent back.
Expand source code
def Start(self): """ Starts the connection to the drive by sending powerup packages until a response has been sent back. """ self.IsStopped = False self.wlcp.Start() self.wlcp.ProbablyDisconnected += self.__probablyDisconnectedWLCP self.CheckDisconnectTimer = Timer(10, self.__isDisconnected) self.CheckDisconnectTimer.start()
def Stop(self)
-
Expand source code
def Stop(self): self.IsStopped = True self.CheckDisconnectTimer.cancel() self.wlcp.Stop() self.wlcp.ProbablyDisconnected -= self.__probablyDisconnectedWLCP