Module ddCommunication.controlcard
Contains controlcard class that allow communication with drive
Expand source code
"""
Contains controlcard class that allow communication with drive
"""
import time
from time import sleep
import os
import sys
import atexit
import logging
import threading
from functools import wraps
import psutil
import subprocess
import serial.tools.list_ports
from .protocols.LCPWirelessProtocol.DDCPCommunicator import DDCPCommunicator
from ._DTM import DTM
from ._controlcard_usb import controlcard_usb
from ._controlcard_flasher import controlcard_flasher
from ._controlcard_serial import controlcard_serial
from ._mutex import Mutex
from .protocols.DDCOMM32To64BitBridge.PythonClient.Subcontrolcard import Subcontrolcard
sys.tracebacklimit = 0 # For none traceback output in report
# sys.tracebacklimit = None # For full traceback output in report
LOGGER = logging.getLogger(__name__)
def log_trace_calls_dec(name):
"""
Decorator for trace logging
"""
def log_trace_calls(func):
@wraps(func)
def container(*args, **kwargs):
params_str = str(args[1:]) + \
" and Keyword arguments " + str(kwargs)
LOGGER.debug(f"{name} was entered with the parameters: {params_str}")
results = func(*args, **kwargs)
LOGGER.debug(f"{name} returned {str(results)}")
return results
return container
return log_trace_calls
class controlcard:
"""
Class that allows communicating with drive
"""
Instance = None
def __new__(cls, *args, **kwargs):
if controlcard.Instance is None:
return super(controlcard, cls).__new__(cls)
return controlcard.Instance
def __init__(self):
if controlcard.Instance is not None:
return
if "MCT 10 Set-up Software.exe" in (p.name()
for p in psutil.process_iter()):
raise Exception("MCT 10 is open. Please close it.")
controlcard.Instance = self
self.mutex = Mutex(
"Controlcard_singleton_1906d071-f4f1-4dd8-b0ee-582118d5e8cc")
self.mutex.acquire()
for proc in psutil.process_iter():
if proc.name() == "Protocol.Reference.Server.CMD.exe":
proc.terminate()
self.dtm = DTM()
sub_controlcard = Subcontrolcard()
self.usb = controlcard_usb(sub_controlcard)
self.flasher = controlcard_flasher(self.dtm, self.usb)
try:
comports = serial.tools.list_ports
com_ports = list(comports.comports())
converter_names = [
'ICPDAS I-7561U USB Serial Converter',
"MOXA USB Serial Port",
"ADAM-4561/4562 Driver",
"Silicon Labs CP210x USB to UART Bridge",
"I-756x",
"Prolific USB-to-Serial Comm Port"]
ports = []
for com_port in com_ports:
for converter_name in converter_names:
if converter_name in com_port.description:
ports.append(com_port.device)
if len(ports) > 1:
raise ValueError("Too many ports found for LCPsim")
if len(ports)< 1:
error_msg = "Cannot find com port for LCPsim. Looking for a usb to rs485 \
converter named either: "
for name in converter_names:
error_msg += f"\n\t{name}"
raise ValueError(error_msg)
com = ports[0]
ddcp = DDCPCommunicator(com)
self.serial = controlcard_serial(ddcp)
except ValueError as value_error:
LOGGER.warning(value_error)
except Exception as error:
raise Exception(error)
self.is_connected = False
atexit.register(self.disconnect_drive)
def __check_for_running_processes(self, list_of_processes):
programs = subprocess.check_output('tasklist')
for process_name in list_of_processes:
if process_name in str(programs):
raise Exception(f"{process_name} is running. Please close it.")
@log_trace_calls_dec("connect_drive")
def connect_drive(self):
"""
Establish communication to the drive.
Default fieldbus is 'USB 1', which is currently also the only possible solution.
"""
self.__check_for_running_processes(["MCT 10 Set-up Software"])
LOGGER.info('Connecting to drive...')
self.usb.connect_drive()
LOGGER.info('Drive connected')
self.is_connected = self.usb.is_connected
@log_trace_calls_dec("disconnect_drive")
def disconnect_drive(self):
""" Close communication to drive. """
self.usb.disconnect_drive()
sleep(1)
LOGGER.info('Drive is disconnected')
self.is_connected = self.usb.is_connected
# If paramInfo is set to other than true. parameter value will never be
# returned
@log_trace_calls_dec("get_drive")
def get_drive(self, param_number, index_number=0):
"""
Read parameter
Expects at least one input (the parameter number), but it is also possible to specify the
index. As default index is 0.
Dependency:
`connect_drive` must be called before this keyword
:param param_number: parameter number
:param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
:return: returns the value of the parameter
"""
if int(param_number) >= 4096:
self.__check_for_running_processes(["lcpsim"])
return self.__get_drive_serial(param_number, index_number)
return self.usb.get_drive(param_number, index_number, "None")
def __get_drive_serial(self, param_number, index_number):
alreadyconnected = self.serial.is_connected
if not alreadyconnected:
self.serial.connect_drive()
try:
value = self.serial.get_drive(param_number, index_number)
except Exception as error:
self.serial.disconnect_drive()
raise error
if not alreadyconnected:
self.serial.disconnect_drive()
return value
@log_trace_calls_dec("set_drive")
def set_drive(self, param_number, value, index_number=0):
"""
Write to parameter
Expects at least two inputs (the parameter number and a value), but
it is also possible to specify the index. As default index is 0.
Dependency:
`connect_drive` must be called before this keyword
:param param_number: parameter number
:param value: value or choice number that the parameter should be set to
:param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
"""
if int(param_number) >= 4096:
self.__check_for_running_processes(["lcpsim"])
return self.__set_drive_serial(param_number, value, index_number)
try:
result = self.usb.set_drive(param_number, value, index_number)
return result
except ConnectionError:
LOGGER.info(
f"Failed setting {param_number} to {value}. Trying again")
result = self.usb.set_drive(param_number, value, index_number)
LOGGER.info(
f"Successfully set value of {param_number} to {value} after retry")
return result
def __set_drive_serial(self, param_number, value, index_number):
alreadyconnected = self.serial.is_connected
if not alreadyconnected:
self.serial.connect_drive()
try:
# if not IndexNr==0:
# raise Exception("Index number different from 0 has not yet been implemented
# for set_rive above par 40-96")
self.serial.set_drive(param_number, value, index_number)
except Exception as error:
self.serial.disconnect_drive()
raise error
if not alreadyconnected:
self.serial.disconnect_drive()
@log_trace_calls_dec("powercycle")
def powercycle(self, ping_attempts=120):
"""
Restarts the drive and waits (ping_attempts) until it the drive is ready again
:param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.
"""
self.usb.powercycle(ping_attempts)
@log_trace_calls_dec("update_powersize")
def update_powersize(self, typecode, force_update=False):
"""
Change drive powersize
:param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5)
:param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).
"""
self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"])
self.flasher.flash(typecode, force_flash=force_update)
@log_trace_calls_dec("ping_drive")
def ping_drive(self, max_try=120):
"""
Ping drive. Tries to establish connection to the drive.
It tries until max. number of tries is reached. Default number of maximum tries is 120.
:param max_try: max number of ping attempts for the drive to get ready state
:return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.
"""
value = self.usb.ping_drive(attempts=max_try)
return value
@log_trace_calls_dec("click_lcp_button")
def click_lcp_button(self, button):
"""
Click a button on LCPsim.
Not all software supports the click button functionality.
:param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back
"""
self.__check_for_running_processes(["lcpsim"])
alreadyconnected = self.serial.is_connected
if not alreadyconnected:
self.serial.connect_drive()
self.serial.click_lcp_button(button)
if not alreadyconnected:
self.serial.disconnect_drive()
Functions
def log_trace_calls_dec(name)
-
Decorator for trace logging
Expand source code
def log_trace_calls_dec(name): """ Decorator for trace logging """ def log_trace_calls(func): @wraps(func) def container(*args, **kwargs): params_str = str(args[1:]) + \ " and Keyword arguments " + str(kwargs) LOGGER.debug(f"{name} was entered with the parameters: {params_str}") results = func(*args, **kwargs) LOGGER.debug(f"{name} returned {str(results)}") return results return container return log_trace_calls
Classes
class controlcard
-
Class that allows communicating with drive
Expand source code
class controlcard: """ Class that allows communicating with drive """ Instance = None def __new__(cls, *args, **kwargs): if controlcard.Instance is None: return super(controlcard, cls).__new__(cls) return controlcard.Instance def __init__(self): if controlcard.Instance is not None: return if "MCT 10 Set-up Software.exe" in (p.name() for p in psutil.process_iter()): raise Exception("MCT 10 is open. Please close it.") controlcard.Instance = self self.mutex = Mutex( "Controlcard_singleton_1906d071-f4f1-4dd8-b0ee-582118d5e8cc") self.mutex.acquire() for proc in psutil.process_iter(): if proc.name() == "Protocol.Reference.Server.CMD.exe": proc.terminate() self.dtm = DTM() sub_controlcard = Subcontrolcard() self.usb = controlcard_usb(sub_controlcard) self.flasher = controlcard_flasher(self.dtm, self.usb) try: comports = serial.tools.list_ports com_ports = list(comports.comports()) converter_names = [ 'ICPDAS I-7561U USB Serial Converter', "MOXA USB Serial Port", "ADAM-4561/4562 Driver", "Silicon Labs CP210x USB to UART Bridge", "I-756x", "Prolific USB-to-Serial Comm Port"] ports = [] for com_port in com_ports: for converter_name in converter_names: if converter_name in com_port.description: ports.append(com_port.device) if len(ports) > 1: raise ValueError("Too many ports found for LCPsim") if len(ports)< 1: error_msg = "Cannot find com port for LCPsim. Looking for a usb to rs485 \ converter named either: " for name in converter_names: error_msg += f"\n\t{name}" raise ValueError(error_msg) com = ports[0] ddcp = DDCPCommunicator(com) self.serial = controlcard_serial(ddcp) except ValueError as value_error: LOGGER.warning(value_error) except Exception as error: raise Exception(error) self.is_connected = False atexit.register(self.disconnect_drive) def __check_for_running_processes(self, list_of_processes): programs = subprocess.check_output('tasklist') for process_name in list_of_processes: if process_name in str(programs): raise Exception(f"{process_name} is running. Please close it.") @log_trace_calls_dec("connect_drive") def connect_drive(self): """ Establish communication to the drive. Default fieldbus is 'USB 1', which is currently also the only possible solution. """ self.__check_for_running_processes(["MCT 10 Set-up Software"]) LOGGER.info('Connecting to drive...') self.usb.connect_drive() LOGGER.info('Drive connected') self.is_connected = self.usb.is_connected @log_trace_calls_dec("disconnect_drive") def disconnect_drive(self): """ Close communication to drive. """ self.usb.disconnect_drive() sleep(1) LOGGER.info('Drive is disconnected') self.is_connected = self.usb.is_connected # If paramInfo is set to other than true. parameter value will never be # returned @log_trace_calls_dec("get_drive") def get_drive(self, param_number, index_number=0): """ Read parameter Expects at least one input (the parameter number), but it is also possible to specify the index. As default index is 0. Dependency: `connect_drive` must be called before this keyword :param param_number: parameter number :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes :return: returns the value of the parameter """ if int(param_number) >= 4096: self.__check_for_running_processes(["lcpsim"]) return self.__get_drive_serial(param_number, index_number) return self.usb.get_drive(param_number, index_number, "None") def __get_drive_serial(self, param_number, index_number): alreadyconnected = self.serial.is_connected if not alreadyconnected: self.serial.connect_drive() try: value = self.serial.get_drive(param_number, index_number) except Exception as error: self.serial.disconnect_drive() raise error if not alreadyconnected: self.serial.disconnect_drive() return value @log_trace_calls_dec("set_drive") def set_drive(self, param_number, value, index_number=0): """ Write to parameter Expects at least two inputs (the parameter number and a value), but it is also possible to specify the index. As default index is 0. Dependency: `connect_drive` must be called before this keyword :param param_number: parameter number :param value: value or choice number that the parameter should be set to :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes """ if int(param_number) >= 4096: self.__check_for_running_processes(["lcpsim"]) return self.__set_drive_serial(param_number, value, index_number) try: result = self.usb.set_drive(param_number, value, index_number) return result except ConnectionError: LOGGER.info( f"Failed setting {param_number} to {value}. Trying again") result = self.usb.set_drive(param_number, value, index_number) LOGGER.info( f"Successfully set value of {param_number} to {value} after retry") return result def __set_drive_serial(self, param_number, value, index_number): alreadyconnected = self.serial.is_connected if not alreadyconnected: self.serial.connect_drive() try: # if not IndexNr==0: # raise Exception("Index number different from 0 has not yet been implemented # for set_rive above par 40-96") self.serial.set_drive(param_number, value, index_number) except Exception as error: self.serial.disconnect_drive() raise error if not alreadyconnected: self.serial.disconnect_drive() @log_trace_calls_dec("powercycle") def powercycle(self, ping_attempts=120): """ Restarts the drive and waits (ping_attempts) until it the drive is ready again :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised. """ self.usb.powercycle(ping_attempts) @log_trace_calls_dec("update_powersize") def update_powersize(self, typecode, force_update=False): """ Change drive powersize :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5) :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode). """ self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"]) self.flasher.flash(typecode, force_flash=force_update) @log_trace_calls_dec("ping_drive") def ping_drive(self, max_try=120): """ Ping drive. Tries to establish connection to the drive. It tries until max. number of tries is reached. Default number of maximum tries is 120. :param max_try: max number of ping attempts for the drive to get ready state :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps. """ value = self.usb.ping_drive(attempts=max_try) return value @log_trace_calls_dec("click_lcp_button") def click_lcp_button(self, button): """ Click a button on LCPsim. Not all software supports the click button functionality. :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back """ self.__check_for_running_processes(["lcpsim"]) alreadyconnected = self.serial.is_connected if not alreadyconnected: self.serial.connect_drive() self.serial.click_lcp_button(button) if not alreadyconnected: self.serial.disconnect_drive()
Class variables
var Instance
Methods
-
Click a button on LCPsim. Not all software supports the click button functionality. :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back
Expand source code
@log_trace_calls_dec("click_lcp_button") def click_lcp_button(self, button): """ Click a button on LCPsim. Not all software supports the click button functionality. :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back """ self.__check_for_running_processes(["lcpsim"]) alreadyconnected = self.serial.is_connected if not alreadyconnected: self.serial.connect_drive() self.serial.click_lcp_button(button) if not alreadyconnected: self.serial.disconnect_drive()
def connect_drive(self)
-
Establish communication to the drive. Default fieldbus is 'USB 1', which is currently also the only possible solution.
Expand source code
@log_trace_calls_dec("connect_drive") def connect_drive(self): """ Establish communication to the drive. Default fieldbus is 'USB 1', which is currently also the only possible solution. """ self.__check_for_running_processes(["MCT 10 Set-up Software"]) LOGGER.info('Connecting to drive...') self.usb.connect_drive() LOGGER.info('Drive connected') self.is_connected = self.usb.is_connected
def disconnect_drive(self)
-
Close communication to drive.
Expand source code
@log_trace_calls_dec("disconnect_drive") def disconnect_drive(self): """ Close communication to drive. """ self.usb.disconnect_drive() sleep(1) LOGGER.info('Drive is disconnected') self.is_connected = self.usb.is_connected
def get_drive(self, param_number, index_number=0)
-
Read parameter Expects at least one input (the parameter number), but it is also possible to specify the index. As default index is 0.
Dependency:
connect_drive
must be called before this keyword:param param_number: parameter number :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes :return: returns the value of the parameter
Expand source code
@log_trace_calls_dec("get_drive") def get_drive(self, param_number, index_number=0): """ Read parameter Expects at least one input (the parameter number), but it is also possible to specify the index. As default index is 0. Dependency: `connect_drive` must be called before this keyword :param param_number: parameter number :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes :return: returns the value of the parameter """ if int(param_number) >= 4096: self.__check_for_running_processes(["lcpsim"]) return self.__get_drive_serial(param_number, index_number) return self.usb.get_drive(param_number, index_number, "None")
def ping_drive(self, max_try=120)
-
Ping drive. Tries to establish connection to the drive. It tries until max. number of tries is reached. Default number of maximum tries is 120. :param max_try: max number of ping attempts for the drive to get ready state :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.
Expand source code
@log_trace_calls_dec("ping_drive") def ping_drive(self, max_try=120): """ Ping drive. Tries to establish connection to the drive. It tries until max. number of tries is reached. Default number of maximum tries is 120. :param max_try: max number of ping attempts for the drive to get ready state :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps. """ value = self.usb.ping_drive(attempts=max_try) return value
def powercycle(self, ping_attempts=120)
-
Restarts the drive and waits (ping_attempts) until it the drive is ready again :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.
Expand source code
@log_trace_calls_dec("powercycle") def powercycle(self, ping_attempts=120): """ Restarts the drive and waits (ping_attempts) until it the drive is ready again :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised. """ self.usb.powercycle(ping_attempts)
def set_drive(self, param_number, value, index_number=0)
-
Write to parameter Expects at least two inputs (the parameter number and a value), but it is also possible to specify the index. As default index is 0.
Dependency:
connect_drive
must be called before this keyword:param param_number: parameter number :param value: value or choice number that the parameter should be set to :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
Expand source code
@log_trace_calls_dec("set_drive") def set_drive(self, param_number, value, index_number=0): """ Write to parameter Expects at least two inputs (the parameter number and a value), but it is also possible to specify the index. As default index is 0. Dependency: `connect_drive` must be called before this keyword :param param_number: parameter number :param value: value or choice number that the parameter should be set to :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes """ if int(param_number) >= 4096: self.__check_for_running_processes(["lcpsim"]) return self.__set_drive_serial(param_number, value, index_number) try: result = self.usb.set_drive(param_number, value, index_number) return result except ConnectionError: LOGGER.info( f"Failed setting {param_number} to {value}. Trying again") result = self.usb.set_drive(param_number, value, index_number) LOGGER.info( f"Successfully set value of {param_number} to {value} after retry") return result
def update_powersize(self, typecode, force_update=False)
-
Change drive powersize :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5) :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).
Expand source code
@log_trace_calls_dec("update_powersize") def update_powersize(self, typecode, force_update=False): """ Change drive powersize :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5) :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode). """ self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"]) self.flasher.flash(typecode, force_flash=force_update)