Module ddCommunication.controlcard

Contains controlcard class that allow communication with drive

Expand source code
"""
Contains controlcard class that allow communication with drive
"""

import time
from time import sleep
import os

import sys
import atexit
import logging
import threading
from functools import wraps
import psutil
import subprocess
import serial.tools.list_ports
from .protocols.LCPWirelessProtocol.DDCPCommunicator import DDCPCommunicator
from ._DTM import DTM
from ._controlcard_usb import controlcard_usb
from ._controlcard_flasher import controlcard_flasher
from ._controlcard_serial import controlcard_serial
from ._mutex import Mutex
from .protocols.DDCOMM32To64BitBridge.PythonClient.Subcontrolcard import Subcontrolcard

sys.tracebacklimit = 0  # For none traceback output in report
# sys.tracebacklimit = None # For full traceback output in report
LOGGER = logging.getLogger(__name__)


def log_trace_calls_dec(name):
    """
    Decorator for trace logging
    """
    def log_trace_calls(func):
        @wraps(func)
        def container(*args, **kwargs):
            params_str = str(args[1:]) + \
                " and Keyword arguments " + str(kwargs)
            LOGGER.debug(f"{name} was entered with the parameters: {params_str}")
            results = func(*args, **kwargs)
            LOGGER.debug(f"{name} returned {str(results)}")
            return results

        return container

    return log_trace_calls

class controlcard:
    """
    Class that allows communicating with drive
    """
    Instance = None

    def __new__(cls, *args, **kwargs):
        if controlcard.Instance is None:
            return super(controlcard, cls).__new__(cls)

        return controlcard.Instance

    def __init__(self):
        if controlcard.Instance is not None:
            return

        if "MCT 10 Set-up Software.exe" in (p.name()
                                            for p in psutil.process_iter()):
            raise Exception("MCT 10 is open. Please close it.")
        controlcard.Instance = self
        self.mutex = Mutex(
           "Controlcard_singleton_1906d071-f4f1-4dd8-b0ee-582118d5e8cc")
        self.mutex.acquire()
        for proc in psutil.process_iter():
            if proc.name() == "Protocol.Reference.Server.CMD.exe":
                proc.terminate()

        self.dtm = DTM()

        sub_controlcard = Subcontrolcard()

        self.usb = controlcard_usb(sub_controlcard)
        self.flasher = controlcard_flasher(self.dtm, self.usb)
        try:
            comports = serial.tools.list_ports
            com_ports = list(comports.comports())
            converter_names = [
                'ICPDAS I-7561U USB Serial Converter',
                "MOXA USB Serial Port",
                "ADAM-4561/4562 Driver",
                "Silicon Labs CP210x USB to UART Bridge",
                "I-756x",
                "Prolific USB-to-Serial Comm Port"]
            ports = []
            for com_port in com_ports:
                for converter_name in converter_names:
                    if converter_name in com_port.description:
                        ports.append(com_port.device)

            if len(ports) > 1:
                raise ValueError("Too many ports found for LCPsim")
            if len(ports)< 1:
                error_msg = "Cannot find com port for LCPsim. Looking for a usb to rs485 \
                converter named either: "
                for name in converter_names:
                    error_msg += f"\n\t{name}"
                raise ValueError(error_msg)

            com = ports[0]
            ddcp = DDCPCommunicator(com)
            self.serial = controlcard_serial(ddcp)
        except ValueError as value_error:
            LOGGER.warning(value_error)
        except Exception as error:
            raise Exception(error)

        self.is_connected = False

        atexit.register(self.disconnect_drive)

    def __check_for_running_processes(self, list_of_processes):
        programs = subprocess.check_output('tasklist')
        for process_name in list_of_processes:
            if process_name in str(programs):
                raise Exception(f"{process_name} is running. Please close it.")

    @log_trace_calls_dec("connect_drive")
    def connect_drive(self):
        """
        Establish communication to the drive.
        Default fieldbus is 'USB 1', which is currently also the only possible solution.
        """
        self.__check_for_running_processes(["MCT 10 Set-up Software"])
        LOGGER.info('Connecting to drive...')
        self.usb.connect_drive()
        LOGGER.info('Drive connected')
        self.is_connected = self.usb.is_connected

    @log_trace_calls_dec("disconnect_drive")
    def disconnect_drive(self):
        """ Close communication to drive. """
        self.usb.disconnect_drive()
        sleep(1)
        LOGGER.info('Drive is disconnected')
        self.is_connected = self.usb.is_connected

    # If paramInfo is set to other than true. parameter value will never be
    # returned

    @log_trace_calls_dec("get_drive")
    def get_drive(self, param_number, index_number=0):
        """
        Read parameter
        Expects at least one input (the parameter number), but it is also possible to specify the
        index. As default index is 0.

        Dependency:
        `connect_drive` must be called before this keyword
        
        :param param_number: parameter number
        :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
        :return: returns the value of the parameter
        """
        if int(param_number) >= 4096:
            self.__check_for_running_processes(["lcpsim"])
            return self.__get_drive_serial(param_number, index_number)

        return self.usb.get_drive(param_number, index_number, "None")

    def __get_drive_serial(self, param_number, index_number):
        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        try:
            value = self.serial.get_drive(param_number, index_number)
        except Exception as error:
            self.serial.disconnect_drive()
            raise error

        if not alreadyconnected:
            self.serial.disconnect_drive()

        return value

    @log_trace_calls_dec("set_drive")
    def set_drive(self, param_number, value, index_number=0):
        """
        Write to parameter
        Expects at least two inputs (the parameter number and a value), but
        it is also possible to specify the index. As default index is 0.

        Dependency:
        `connect_drive` must be called before this keyword

        :param param_number: parameter number
        :param value: value or choice number that the parameter should be set to
        :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
        """
        if int(param_number) >= 4096:
            self.__check_for_running_processes(["lcpsim"])
            return self.__set_drive_serial(param_number, value, index_number)

        try:
            result = self.usb.set_drive(param_number, value, index_number)
            return result
        except ConnectionError:
            LOGGER.info(
                f"Failed setting {param_number} to {value}. Trying again")
            result = self.usb.set_drive(param_number, value, index_number)
            LOGGER.info(
                f"Successfully set value of {param_number} to {value} after retry")
            return result

    def __set_drive_serial(self, param_number, value, index_number):

        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        try:
            # if not IndexNr==0:
            #    raise Exception("Index number different from 0 has not yet been implemented
            #    for set_rive above par 40-96")
            self.serial.set_drive(param_number, value, index_number)
        except Exception as error:
            self.serial.disconnect_drive()
            raise error

        if not alreadyconnected:
            self.serial.disconnect_drive()


    @log_trace_calls_dec("powercycle")
    def powercycle(self, ping_attempts=120):
        """
        Restarts the drive and waits (ping_attempts) until it the drive is ready again
        :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.
        """
        self.usb.powercycle(ping_attempts)


    @log_trace_calls_dec("update_powersize")
    def update_powersize(self, typecode, force_update=False):
        """
        Change drive powersize
        :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5)
        :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).
        """
        self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"])
        self.flasher.flash(typecode, force_flash=force_update)


    @log_trace_calls_dec("ping_drive")
    def ping_drive(self, max_try=120):
        """
        Ping drive. Tries to establish connection to the drive.
        It tries until max. number of tries is reached. Default number of maximum tries is 120.
        :param max_try: max number of ping attempts for the drive to get ready state
        :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.
        """

        value = self.usb.ping_drive(attempts=max_try)
        return value

    @log_trace_calls_dec("click_lcp_button")
    def click_lcp_button(self, button):
        """
        Click a button on LCPsim.
        Not all software supports the click button functionality.
        :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back
        """
        self.__check_for_running_processes(["lcpsim"])
        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        self.serial.click_lcp_button(button)

        if not alreadyconnected:
            self.serial.disconnect_drive()

Functions

def log_trace_calls_dec(name)

Decorator for trace logging

Expand source code
def log_trace_calls_dec(name):
    """
    Decorator for trace logging
    """
    def log_trace_calls(func):
        @wraps(func)
        def container(*args, **kwargs):
            params_str = str(args[1:]) + \
                " and Keyword arguments " + str(kwargs)
            LOGGER.debug(f"{name} was entered with the parameters: {params_str}")
            results = func(*args, **kwargs)
            LOGGER.debug(f"{name} returned {str(results)}")
            return results

        return container

    return log_trace_calls

Classes

class controlcard

Class that allows communicating with drive

Expand source code
class controlcard:
    """
    Class that allows communicating with drive
    """
    Instance = None

    def __new__(cls, *args, **kwargs):
        if controlcard.Instance is None:
            return super(controlcard, cls).__new__(cls)

        return controlcard.Instance

    def __init__(self):
        if controlcard.Instance is not None:
            return

        if "MCT 10 Set-up Software.exe" in (p.name()
                                            for p in psutil.process_iter()):
            raise Exception("MCT 10 is open. Please close it.")
        controlcard.Instance = self
        self.mutex = Mutex(
           "Controlcard_singleton_1906d071-f4f1-4dd8-b0ee-582118d5e8cc")
        self.mutex.acquire()
        for proc in psutil.process_iter():
            if proc.name() == "Protocol.Reference.Server.CMD.exe":
                proc.terminate()

        self.dtm = DTM()

        sub_controlcard = Subcontrolcard()

        self.usb = controlcard_usb(sub_controlcard)
        self.flasher = controlcard_flasher(self.dtm, self.usb)
        try:
            comports = serial.tools.list_ports
            com_ports = list(comports.comports())
            converter_names = [
                'ICPDAS I-7561U USB Serial Converter',
                "MOXA USB Serial Port",
                "ADAM-4561/4562 Driver",
                "Silicon Labs CP210x USB to UART Bridge",
                "I-756x",
                "Prolific USB-to-Serial Comm Port"]
            ports = []
            for com_port in com_ports:
                for converter_name in converter_names:
                    if converter_name in com_port.description:
                        ports.append(com_port.device)

            if len(ports) > 1:
                raise ValueError("Too many ports found for LCPsim")
            if len(ports)< 1:
                error_msg = "Cannot find com port for LCPsim. Looking for a usb to rs485 \
                converter named either: "
                for name in converter_names:
                    error_msg += f"\n\t{name}"
                raise ValueError(error_msg)

            com = ports[0]
            ddcp = DDCPCommunicator(com)
            self.serial = controlcard_serial(ddcp)
        except ValueError as value_error:
            LOGGER.warning(value_error)
        except Exception as error:
            raise Exception(error)

        self.is_connected = False

        atexit.register(self.disconnect_drive)

    def __check_for_running_processes(self, list_of_processes):
        programs = subprocess.check_output('tasklist')
        for process_name in list_of_processes:
            if process_name in str(programs):
                raise Exception(f"{process_name} is running. Please close it.")

    @log_trace_calls_dec("connect_drive")
    def connect_drive(self):
        """
        Establish communication to the drive.
        Default fieldbus is 'USB 1', which is currently also the only possible solution.
        """
        self.__check_for_running_processes(["MCT 10 Set-up Software"])
        LOGGER.info('Connecting to drive...')
        self.usb.connect_drive()
        LOGGER.info('Drive connected')
        self.is_connected = self.usb.is_connected

    @log_trace_calls_dec("disconnect_drive")
    def disconnect_drive(self):
        """ Close communication to drive. """
        self.usb.disconnect_drive()
        sleep(1)
        LOGGER.info('Drive is disconnected')
        self.is_connected = self.usb.is_connected

    # If paramInfo is set to other than true. parameter value will never be
    # returned

    @log_trace_calls_dec("get_drive")
    def get_drive(self, param_number, index_number=0):
        """
        Read parameter
        Expects at least one input (the parameter number), but it is also possible to specify the
        index. As default index is 0.

        Dependency:
        `connect_drive` must be called before this keyword
        
        :param param_number: parameter number
        :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
        :return: returns the value of the parameter
        """
        if int(param_number) >= 4096:
            self.__check_for_running_processes(["lcpsim"])
            return self.__get_drive_serial(param_number, index_number)

        return self.usb.get_drive(param_number, index_number, "None")

    def __get_drive_serial(self, param_number, index_number):
        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        try:
            value = self.serial.get_drive(param_number, index_number)
        except Exception as error:
            self.serial.disconnect_drive()
            raise error

        if not alreadyconnected:
            self.serial.disconnect_drive()

        return value

    @log_trace_calls_dec("set_drive")
    def set_drive(self, param_number, value, index_number=0):
        """
        Write to parameter
        Expects at least two inputs (the parameter number and a value), but
        it is also possible to specify the index. As default index is 0.

        Dependency:
        `connect_drive` must be called before this keyword

        :param param_number: parameter number
        :param value: value or choice number that the parameter should be set to
        :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
        """
        if int(param_number) >= 4096:
            self.__check_for_running_processes(["lcpsim"])
            return self.__set_drive_serial(param_number, value, index_number)

        try:
            result = self.usb.set_drive(param_number, value, index_number)
            return result
        except ConnectionError:
            LOGGER.info(
                f"Failed setting {param_number} to {value}. Trying again")
            result = self.usb.set_drive(param_number, value, index_number)
            LOGGER.info(
                f"Successfully set value of {param_number} to {value} after retry")
            return result

    def __set_drive_serial(self, param_number, value, index_number):

        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        try:
            # if not IndexNr==0:
            #    raise Exception("Index number different from 0 has not yet been implemented
            #    for set_rive above par 40-96")
            self.serial.set_drive(param_number, value, index_number)
        except Exception as error:
            self.serial.disconnect_drive()
            raise error

        if not alreadyconnected:
            self.serial.disconnect_drive()


    @log_trace_calls_dec("powercycle")
    def powercycle(self, ping_attempts=120):
        """
        Restarts the drive and waits (ping_attempts) until it the drive is ready again
        :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.
        """
        self.usb.powercycle(ping_attempts)


    @log_trace_calls_dec("update_powersize")
    def update_powersize(self, typecode, force_update=False):
        """
        Change drive powersize
        :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5)
        :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).
        """
        self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"])
        self.flasher.flash(typecode, force_flash=force_update)


    @log_trace_calls_dec("ping_drive")
    def ping_drive(self, max_try=120):
        """
        Ping drive. Tries to establish connection to the drive.
        It tries until max. number of tries is reached. Default number of maximum tries is 120.
        :param max_try: max number of ping attempts for the drive to get ready state
        :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.
        """

        value = self.usb.ping_drive(attempts=max_try)
        return value

    @log_trace_calls_dec("click_lcp_button")
    def click_lcp_button(self, button):
        """
        Click a button on LCPsim.
        Not all software supports the click button functionality.
        :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back
        """
        self.__check_for_running_processes(["lcpsim"])
        alreadyconnected = self.serial.is_connected
        if not alreadyconnected:
            self.serial.connect_drive()

        self.serial.click_lcp_button(button)

        if not alreadyconnected:
            self.serial.disconnect_drive()

Class variables

var Instance

Methods

def click_lcp_button(self, button)

Click a button on LCPsim. Not all software supports the click button functionality. :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back

Expand source code
@log_trace_calls_dec("click_lcp_button")
def click_lcp_button(self, button):
    """
    Click a button on LCPsim.
    Not all software supports the click button functionality.
    :param button: LCP button to click. Possible buttons to click are: auto_on, hand_on, off, ok, back
    """
    self.__check_for_running_processes(["lcpsim"])
    alreadyconnected = self.serial.is_connected
    if not alreadyconnected:
        self.serial.connect_drive()

    self.serial.click_lcp_button(button)

    if not alreadyconnected:
        self.serial.disconnect_drive()
def connect_drive(self)

Establish communication to the drive. Default fieldbus is 'USB 1', which is currently also the only possible solution.

Expand source code
@log_trace_calls_dec("connect_drive")
def connect_drive(self):
    """
    Establish communication to the drive.
    Default fieldbus is 'USB 1', which is currently also the only possible solution.
    """
    self.__check_for_running_processes(["MCT 10 Set-up Software"])
    LOGGER.info('Connecting to drive...')
    self.usb.connect_drive()
    LOGGER.info('Drive connected')
    self.is_connected = self.usb.is_connected
def disconnect_drive(self)

Close communication to drive.

Expand source code
@log_trace_calls_dec("disconnect_drive")
def disconnect_drive(self):
    """ Close communication to drive. """
    self.usb.disconnect_drive()
    sleep(1)
    LOGGER.info('Drive is disconnected')
    self.is_connected = self.usb.is_connected
def get_drive(self, param_number, index_number=0)

Read parameter Expects at least one input (the parameter number), but it is also possible to specify the index. As default index is 0.

Dependency: connect_drive must be called before this keyword

:param param_number: parameter number :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes :return: returns the value of the parameter

Expand source code
@log_trace_calls_dec("get_drive")
def get_drive(self, param_number, index_number=0):
    """
    Read parameter
    Expects at least one input (the parameter number), but it is also possible to specify the
    index. As default index is 0.

    Dependency:
    `connect_drive` must be called before this keyword
    
    :param param_number: parameter number
    :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
    :return: returns the value of the parameter
    """
    if int(param_number) >= 4096:
        self.__check_for_running_processes(["lcpsim"])
        return self.__get_drive_serial(param_number, index_number)

    return self.usb.get_drive(param_number, index_number, "None")
def ping_drive(self, max_try=120)

Ping drive. Tries to establish connection to the drive. It tries until max. number of tries is reached. Default number of maximum tries is 120. :param max_try: max number of ping attempts for the drive to get ready state :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.

Expand source code
@log_trace_calls_dec("ping_drive")
def ping_drive(self, max_try=120):
    """
    Ping drive. Tries to establish connection to the drive.
    It tries until max. number of tries is reached. Default number of maximum tries is 120.
    :param max_try: max number of ping attempts for the drive to get ready state
    :return: Returns True if drive is ready before the max ping attemps, returns False if the drive is not ready after the ping attemps.
    """

    value = self.usb.ping_drive(attempts=max_try)
    return value
def powercycle(self, ping_attempts=120)

Restarts the drive and waits (ping_attempts) until it the drive is ready again :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.

Expand source code
@log_trace_calls_dec("powercycle")
def powercycle(self, ping_attempts=120):
    """
    Restarts the drive and waits (ping_attempts) until it the drive is ready again
    :param ping_attempts: max number of ping attempts. If the drive is not ready after the ping attemps an error is raised.
    """
    self.usb.powercycle(ping_attempts)
def set_drive(self, param_number, value, index_number=0)

Write to parameter Expects at least two inputs (the parameter number and a value), but it is also possible to specify the index. As default index is 0.

Dependency: connect_drive must be called before this keyword

:param param_number: parameter number :param value: value or choice number that the parameter should be set to :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes

Expand source code
@log_trace_calls_dec("set_drive")
def set_drive(self, param_number, value, index_number=0):
    """
    Write to parameter
    Expects at least two inputs (the parameter number and a value), but
    it is also possible to specify the index. As default index is 0.

    Dependency:
    `connect_drive` must be called before this keyword

    :param param_number: parameter number
    :param value: value or choice number that the parameter should be set to
    :param index_number: default index number equals 0, but some parameters provides the possibility of more indexes
    """
    if int(param_number) >= 4096:
        self.__check_for_running_processes(["lcpsim"])
        return self.__set_drive_serial(param_number, value, index_number)

    try:
        result = self.usb.set_drive(param_number, value, index_number)
        return result
    except ConnectionError:
        LOGGER.info(
            f"Failed setting {param_number} to {value}. Trying again")
        result = self.usb.set_drive(param_number, value, index_number)
        LOGGER.info(
            f"Successfully set value of {param_number} to {value} after retry")
        return result
def update_powersize(self, typecode, force_update=False)

Change drive powersize :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5) :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).

Expand source code
@log_trace_calls_dec("update_powersize")
def update_powersize(self, typecode, force_update=False):
    """
    Change drive powersize
    :param typecode: Drive typecode (name) (12 characters, e.g. FC-302P4K0T5)
    :param force_update: Default is False. If False, the drive is expected to first be connected. True may be used if the drive cannot be connected (in bootmode).
    """
    self.__check_for_running_processes(["MCT 10 Set-up Software", "lcpsim"])
    self.flasher.flash(typecode, force_flash=force_update)