Source code for canlib.device

import itertools
import sys
from collections import namedtuple
from importlib import import_module

from . import deprecation


class _Libs(object):
    """Class to facilitate importing canlib modules on-demand

    The main problem this class solves is that kvrlib is not available on
    linux, so unconditionally importing kvrlib is not an option. However, it is
    also preferable that simply importing this module loads most of pycanlib,
    if a user wants to use `Device` to interact with canlib, they should not
    need to load e.g. kvmlib.

    This class is instanced, once, in device.py as `_libs`. Any time a library,
    for example `canlib`, is used in device.py it will be accessed with
    ``_libs.canlib``. The first time this happens, there will be no ``canlib``
    attribute on ``libs``, and `_Libs.__getattr__` will be called. This looks
    up the library name provided in `_Libs._imports`, which should give a
    no-argument function that imports that library and returns
    it. `_Libs.__getattr__` then saves the returned module as an attribute,
    making sure that `_Libs.__getattr__` is only called the first time any
    particular library is requested.

    """

    def import_kvrlib(self=None):
        try:
            module = import_module('canlib.kvrlib')
        except AttributeError:
            if not sys.platform.startswith('win'):
                raise ImportError("kvrlib is only available on Windows")
            else:
                raise
        else:
            return module

    _imports = {
        'canlib': lambda: import_module('canlib.canlib'),
        'kvmlib': lambda: import_module('canlib.kvmlib'),
        'linlib': lambda: import_module('canlib.linlib'),
        'kvrlib': import_kvrlib,
    }

    def __getattr__(self, name):
        setattr(self, name, self._imports[name]())
        return getattr(self, name)


_libs = _Libs()


_ChannelInfo = namedtuple(
    '_channel_info',
    "channel_number ean serial channel_name",
)
_ChannelInfo.__new__.__defaults__ = tuple(None for _ in _ChannelInfo._fields)


[docs]def connected_devices(): """Get all currently connected devices as `Device` Returns an iterator of `Device` object, one object for every physical device currently connected. .. versionadded:: 1.6 .. versionchanged:: 1.7 `Device.last_channel_number` will now be set. """ last_device = None for curr_channel in itertools.count(): try: data = _libs.canlib.ChannelData(curr_channel) ean = data.card_upc_no serial = data.card_serial_no except _libs.canlib.CanNotFound: return else: dev = Device(ean=ean, serial=serial) dev.last_channel_number = curr_channel if dev != last_device: yield dev last_device = dev
def _find_channel(info, last=None): assert info is not None if last is not None: try: new_info = _match_channel(last, info) except _libs.canlib.CanNotFound: new_info = None if new_info is not None: return new_info for curr_channel in itertools.count(): new_info = _match_channel(curr_channel, info) if new_info is not None: return new_info def _match_channel(channel_number, info): target_ch = info.channel_number target_ean = info.ean target_serial = info.serial target_name = info.channel_name if (target_ch is not None) and (channel_number != target_ch): return None data = _libs.canlib.ChannelData(channel_number) ean = data.card_upc_no if (target_ean is not None) and (ean != target_ean): return None serial = data.card_serial_no if (target_serial is not None) and (serial != target_serial): return None name = data.custom_name if (target_name is not None) and (name != target_name): return None chan_no_on_card = data.chan_no_on_card # The Device channel_number should always point at the first channel on card channel_number -= chan_no_on_card return _ChannelInfo(channel_number, ean, serial, name)
[docs]class Device(object): """Class for keeping track of a physical device This class represents a physical device regardless of whether it is currently connected or not, and on which channel it is connected. If the device is currently connected, `Device.find` can be used to get a `Device` object:: dev = Device.find(ean=EAN.from_string('67890-1')) `Device.find` supports searching for devices based on a variety of information, and will return a `Device` object corresponding to the first physical device that matched its arguments. In this case, it would be the first device found with an EAN of 73-30130-67890-1. If the device wanted is not currently connected, `Device` objects can be created with their EAN and serial number (as this is the minimal information needed to uniquely identify a specific device):: dev = Device(ean=EAN.from_string('67890-1'), serial=42) Two `Device` objects can be checked for equality (whether they refer to the same device) and be converted to a `str`. `Device.probe_info` can also be used for a more verbose string representation that queries the device (if connected) for various pieces of information. This class also provides functions for creating the other objects of `canlib`: * `canlib.Channel` -- `Device.channel` * `canlib.ChannelData` -- `Device.channel_data` * `canlib.IOControl` -- `Device.iocontrol` * `kvmlib.Memorator`-- `Device.memorator` * `linlib.Channel` -- `Device.lin_master` and `Device.lin_slave` Attributes: Device.ean (`canlib.EAN`): The EAN of this device. Device.serial (`int`): The serial number of this device. last_channel_number (`int`): The channel number this device was last found on (used as an optimization; while the device stays on the same CANlib channel there is no need for a linear search of all channels). .. versionadded:: 1.6 """ __slots__ = ("ean", "serial", "last_channel_number")
[docs] @classmethod def find(cls, channel_number=None, ean=None, serial=None, channel_name=None): """Searches for a specific device Goes through all CANlib channels (from zero and up), until one of them matches the given arguments. If an argument is omitted or `None`, any device will match it. If no devices matches a `canlib.CanNotFound` exception will be raised. Args: channel_number (`int`): Find a device on this CANlib channel (number). ean (`canlib.EAN`): Find a device with this EAN. serial (`int`): Find a device with this serial number. channel_name (`str`): Find a device with this CANlib channel name. """ target_info = _ChannelInfo(channel_number, ean, serial, channel_name) info = _find_channel(target_info) dev = cls(ean=info.ean, serial=info.serial) dev.last_channel_number = info.channel_number # last known CANlib channel return dev
def __init__(self, ean, serial): # setval = super().__setattr__ # setval("ean", ean) # setval("serial", serial) # super() is annoying in python 2 object.__setattr__(self, "ean", ean) object.__setattr__(self, "serial", serial) # The last channel number this device was found on self.last_channel_number = None # required in Python 2 def __ne__(self, other): # required in python 2 eq = self == other if eq is NotImplemented: return eq else: return not eq def __eq__(self, other): if not isinstance(other, Device): return NotImplemented return self.ean == other.ean and self.serial == other.serial def __hash__(self): return hash((self.ean, self.serial)) def __setattr__(self, name, value): if name != "last_channel_number": raise TypeError(self.__class__.__name__ + " only supports setting last_channel_number") else: # super().__setattr__(name, value) # super() is annoying in python 2 object.__setattr__(self, name, value) def __repr__(self): return "{cls}(ean={ean!r}, serial={serial!r})".format( cls=self.__class__.__name__, ean=self.ean, serial=self.serial ) def __str__(self): return "{ean}:{serial}".format(ean=self.ean, serial=self.serial)
[docs] @deprecation.deprecated.favour(".open_channel()") def channel(self, *args, **kwargs): """A `canlib.Channel` for this device's first channel The experimental attribute `_chan_no_on_card` may be given, the `int` provided will be added (without any verifications) to the `channel_number` where this device was found on, and may thus be used to open a specific local channel on this device. NOTE: When using the `_chan_no_on_card` attribute, you must make sure that the card actually have the assumed number of local channels. Using this argument with a too large `int` could return a channel belonging to a different device. Arguments to `canlib.openChannel` other than the channel number can be passed to this function. .. versionchanged:: 1.13 Added attribute `_chan_no_on_card` .. deprecated:: 1.16 Use `open_channel` instead """ _chan_no_on_card = 0 if '_chan_no_on_card' in kwargs: _chan_no_on_card = kwargs.get("_chan_no_on_card") del kwargs['_chan_no_on_card'] # Should we somehow (without affecting performance) check that _chan_no_on_card is small enough? channel_number = self.channel_number() + _chan_no_on_card return _libs.canlib.openChannel(channel_number, *args, **kwargs)
[docs] def channel_data(self): """A `canlib.ChannelData` for this device's first channel""" return _libs.canlib.ChannelData(self.channel_number())
[docs] def channel_number(self): """An `int` with this device's CANlib channel number""" info = _find_channel( _ChannelInfo(ean=self.ean, serial=self.serial), last=self.last_channel_number, ) self.last_channel_number = info.channel_number return info.channel_number
[docs] def iocontrol(self): """A `canlib.IOControl` for this device's first channel""" return _libs.canlib.IOControl(self.open_channel())
[docs] def isconnected(self): """A `bool` whether this device can currently be found""" try: self.channel_number() except _libs.canlib.CanNotFound: return False else: return True
[docs] def issubset(self, other): """Check if device is a subset of other Device. This can be used to see if a found device fulfills criteria specified in other. Setting an attribute to None is regarded as a Any. This means that e.g. any serial number will be a subset of a serial number specified as None. .. versionadded:: 1.9 """ if other.serial is not None and self.serial != other.serial: return False if other.ean is not None and self.ean != other.ean: return False return True
[docs] def lin_master(self, *args, **kwargs): """A `linlib.Channel` master for this device's first channel Arguments to `linlib.openMaster` other than the channel number can be passed to this function. """ return _libs.linlib.openMaster(self.channel_number(), *args, **kwargs)
[docs] def lin_slave(self, *args, **kwargs): """A `linlib.Channel` slave for this device's first channel Arguments to `linlib.openSlave` other than the channel number can be passed to this function. """ return _libs.linlib.openSlave(self.channel_number(), *args, **kwargs)
[docs] def memorator(self, *args, **kwargs): """A `canlib.kvmlib.Memorator` for this device's first channel Arguments to `canlib.openChannel` other than the channel number can be passed to this function. """ return _libs.kvmlib.openDevice(self.channel_number(), *args, **kwargs)
[docs] def open_channel(self, chan_no_on_card=0, **kwargs): """A `canlib.Channel` for this device's first channel The parameter `chan_no_on_card` will be added (without any verifications) to the `channel_number` where this device was found on, and may thus be used to open a specific local channel on this device. NOTE: When using the `chan_no_on_card` parameter, you must make sure that the card actually have the assumed number of local channels. Using this parameter with a too large `int` could return a channel belonging to a different device. Arguments to `canlib.open_channel`, other than the channel number, can be passed to this function, but must be passed as keyword arguments. .. versionadded:: 1.16 """ # Should we somehow (without affecting performance) check that # chan_no_on_card is small enough? channel_number = self.channel_number() + chan_no_on_card return _libs.canlib.openChannel(channel=channel_number, **kwargs)
[docs] def probe_info(self): """A `str` with information about this device This function is useful when the `Device`'s `str()` does not give enough information while debugging. When the device is connected various pieces of information such as the device name, firmware, and driver name is given. When the device is not connected only basic information can be given. Note: Never inspect the return value of this function, only use it for displaying information. Exactly what is shown depends on whether the device is connected or not, and is not guaranteed to stay consistent between versions. """ try: data = self.channel_data() infos = { 'CANlib Channel': self.channel_number(), 'Card Number': data.card_number, 'Device': data.channel_name, 'Driver Name': data.driver_name, 'EAN': self.ean, 'Firmware': data.card_firmware_rev, 'Serial Number': self.serial, } except _libs.canlib.CanNotFound: infos = { 'EAN': self.ean, 'Last CANlib Channel': self.last_channel_number, 'Serial Number': self.serial, } width = max(len(label) for label in infos.keys()) # We sort the keys before printing as pre python 3.6 the order of # dictionary keys is arbitrary, and can change between calls. text = '\n'.join(key.ljust(width) + ': ' + str(val) for key, val in sorted(infos.items())) return text
[docs] def remote(self, *args, **kwargs): """A `canlib.kvrlib.RemoteDevice` for this device Arguments to `canlib.kvrlib.openDevice` other than the channel number can be passed to this function. """ return _libs.kvrlib.openDevice(self.channel_number(), *args, **kwargs)