Source code for canlib.kvrlib.discovery

import ctypes as ct
from collections import namedtuple

from .. import EAN, VersionNumber
from .address import Address
from .enums import (Accessibility, AddressTypeFlag, Availability, DeviceUsage,
                    ServiceState, StartInfo)
from .exceptions import KvrBlank
from .structures import kvrAddress, kvrDeviceInfo
from .wrapper import dll

# kvrDiscoveryStoreDevices
# kvrDeviceGetServiceStatusText

_ASSUMED_NUMBER_OF_DEFAULT_ADDRESSES = 20


ServiceStatus = namedtuple('ServiceStatus', "state start_info text")


[docs]def get_default_discovery_addresses(broadcast=True, stored=False): """Retrieve the default discovery addresses Args: broadcast (`bool`): If ``True`` (the default), then the returned list will contain broadcast addresses. stored (`bool`): If ``True`` (defaults to ``False``), then the returned list will contain earlier stored addresses. If both arguments are ``False``, a `ValueError` will be raised. Retruns a `list` of `Address` objects. """ num = _ASSUMED_NUMBER_OF_DEFAULT_ADDRESSES address_list = (kvrAddress * num)() returned_num = ct.c_uint32() if broadcast and stored: address_type_flags = AddressTypeFlag.ALL elif broadcast: address_type_flags = AddressTypeFlag.BROADCAST elif stored: address_type_flags = AddressTypeFlag.STORED else: raise ValueError('broadcast or stored argument must be True') dll.kvrDiscoveryGetDefaultAddresses( address_list, num, ct.byref(returned_num), address_type_flags, ) # kvrDiscoveryGetDefaultAddresses doesn't tell us how large address_list # needs to be for all answers to fit, so we enlarge it by doubling every # time we might have missed some results. while num == returned_num.value: num *= 2 address_list = (kvrAddress * num)() dll.kvrDiscoveryGetDefaultAddresses( address_list, num, ct.byref(returned_num), address_type_flags, ) default_addresses = [Address.from_c(c_addr) for c_addr in address_list[: returned_num.value]] return default_addresses
[docs]def openDiscovery(): """Creates and returns a `Discovery` object Device discovery is normally done using `discover_info_set`. """ handle = ct.c_int32() dll.kvrDiscoveryOpen(ct.byref(handle)) return Discovery(handle)
[docs]def set_clear_stored_devices_on_exit(val): """Sets whether kvrlib should clear stored devices when the application exist""" dll.kvrDiscoveryClearDevicesAtExit(int(val))
[docs]def store_devices(devices): """Store a collection of `DeviceInfo` objects in the registry See `DeviceInfoSet` for a simpler way of dealing with device infos and the registry. Note: Previously stored devices are cleared. """ DevArray = kvrDeviceInfo * len(devices) devarray = DevArray(*(dev.device_info for dev in devices)) dll.kvrDiscoveryStoreDevices(devarray, len(devarray))
[docs]def start_discovery(delay, timeout, addresses=None, report_stored=True): """Start and return a `Discovery` Device discovery is normally done using `discover_info_set`. The returned object should usually be used as a context handler:: with kvrlib.start_discovery(delay=100, timeout=1000) as disc: for result in disc.results(): # process the discovery result print(result) """ disc = openDiscovery() if addresses is None: addresses = get_default_discovery_addresses() disc.addresses = addresses disc.start(delay, timeout, report_stored) return disc
[docs]def stored_devices(): """Read the devices stored in the registry as a tuple of `DeviceInfo` objects""" with start_discovery(0, 0, report_stored=True) as disc: stored = tuple(dev for dev in disc.results() if dev.availability & Availability.STORED) return stored
[docs]class Discovery: """A kvrlib "Discovery" process Most of the time the discovery process can be handled by `.discover_info_set`, which returns the results of the discovery as a `.DeviceInfoSet`. Even when interacted with directly, instnances of this class are normally not instantiated directly, but created using `.start_discovery`, or sometimes using `.openDiscovery`. Instances of this class can be used as context handlers, in which case the discovery will be closed when the context is exited. The discovery will also be closed when the instance is garbage collected, if it hasn't already. """ def __init__(self, handle): self.handle = handle def __del__(self): self.close() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() addresses = property( doc="""The list of addresses to use for discovery Note: This attribute is write-only. This attribute should be a list of `.Address` objects. If the `.Discovery` object was created by `.start_discovery`, the addresses are automatically set. Otherwise, they must be assigned before `.Discovery.start` can be called. """ ) @addresses.setter def addresses(self, val): address_list = (kvrAddress * len(val))(*(a.to_c() for a in val)) dll.kvrDiscoverySetAddresses(self.handle, address_list, len(val))
[docs] def close(self): """Close the discovery process This function is called when the `Discovery` object is garbage-collected. If the `Discovery` object is used as a context handler, this function will be called when the context exits. """ if self.handle is not None: dll.kvrDiscoveryClose(self.handle) self.handle = None
[docs] def results(self): """Return an iterator of the result from the discovery The results are yielded as `DeviceInfo` objects. """ try: while True: info = kvrDeviceInfo() dll.kvrDiscoveryGetResults(self.handle, ct.byref(info)) yield DeviceInfo(info) except KvrBlank: return
[docs] def start(self, delay, timeout, report_stored=True): """Run the discovery If the `Discovery` object was created by `.start_discovery`, the discovery has already been run, and this function does not need to be called. After the discovery has been run, its results can be retrieved using `Discovery.results`. Args: delay (`int`): The millisecond delay between sending discovery messages to addresses in the address list. timeout (`int`): The millisecond timeout after all discovery messages have been sent, before returning. report_stored (`bool`): if ``True`` (the default), stored devices will be discovered. """ dll.kvrDiscoveryStartEx(self.handle, delay, timeout, int(report_stored)) return self
[docs]class DeviceInfo: """Information about a device that can be written to the registry See `.DeviceInfoSet` for information about how to get `DeviceInfo` objects, process them, and then write them to the registry. """ def __init__(self, device_info=None): if device_info is None: device_info = kvrDeviceInfo() self.device_info = device_info def __repr__(self): return '<{cls} {name}({hostname}) - {ean}:{serial}>'.format( cls=self.__class__.__name__, name=self.name, hostname=self.hostname, ean=self.ean, serial=self.serial, ) @property def accessibility(self): """`~canlib.kvrlib.Accessibility`: The accessibility of this device""" return Accessibility(self.device_info.accessibility) @accessibility.setter def accessibility(self, val): if val not in Accessibility: raise TypeError(val + " not a valid Accessibility") self.device_info.accessibility = int(val) @property def availability(self): """`~canlib.kvrlib.Availability`: The availability of this device""" return Availability(self.device_info.availability) @property def base_station_id(self): """`~canlib.kvrlib.Address`: Address of the base station""" return Address.from_c(self.device_info.base_station_id) @property def client_address(self): """`~canlib.kvrlib.Address`: Address of connected client""" return Address.from_c(self.device_info.client_address) @client_address.setter def client_address(self, val): if not isinstance(val, Address): raise TypeError(str(type(val)) + " received, exptected " + str(Address)) self.device_info.client_address = val.to_c() @property def connect(self): """`bool`: Whether the service should connect to this device""" val = self.device_info.request_connection # The attribute can also, secretly, be set to ``3`` to flush dns. if val in (0, 1): val = bool(val) return val @connect.setter def connect(self, val): self.device_info.request_connection = int(val) @property def device_address(self): """`~canlib.kvrlib.Address`: Address of remote device""" return Address.from_c(self.device_info.device_address) @device_address.setter def device_address(self, val): if not isinstance(val, Address): raise TypeError(str(type(val)) + " received, exptected " + str(Address)) self.device_info.device_address = val.to_c() @property def ean(self): """`~canlib.ean.EAN`: EAN of device""" ean = EAN.from_hilo((self.device_info.ean_hi, self.device_info.ean_lo)) return ean @ean.setter def ean(self, val): ean = EAN(val) self.device_info.ean_hi, self.device_info.ean_lo = ean.hilo() @property def firmware_version(self): """`~canlib.versionnumber.VersionNumber`: Firmware version""" version = VersionNumber( self.device_info.fw_major_ver, self.device_info.fw_minor_ver, self.device_info.fw_build_ver, ) return version encryption_key = property( doc="""`bytes`: The encryption key to use when encrypting communication Note: This attribute is write-only. """ ) @encryption_key.setter # noqa def encryption_key(self, val): key = ct.create_string_buffer(val) dll.kvrDiscoverySetEncryptionKey(self.device_info, key) @property def hostname(self): """`str`: DNS hostname if available, otherwise an empty string""" return self.device_info.host_name.decode('utf-8') @hostname.setter def hostname(self, val): self.device_info.host_name = val.encode('utf-8') @property def name(self): """`str`: User-defined name of device""" try: name = self.device_info.name.decode('utf-8') except UnicodeDecodeError: name = self.device_info.name.decode('cp1252') return name @name.setter def name(self, val): self.device_info.name = val.encode('utf-8') password = property( doc="""`str`: The accessibility password to use when connecting to a device Note: This attribute is write-only. """ ) @password.setter # noqa def password(self, val): password = ct.create_string_buffer(val.encode('utf-8')) dll.kvrDiscoverySetPassword(self.device_info, password) @property def serial(self): """`int`: The serial number of the device""" return self.device_info.ser_no @serial.setter def serial(self, val): if not isinstance(val, int): raise TypeError("Serial number must be an int") self.device_info.ser_no = val @property def service_status(self): """`~canlib.kvrlib.ServiceStatus`: A tuple with the service status of the device The returned tuple has the format ``(state, start_info, text)``, where ``state`` is a `.ServiceState`, ``start_info`` is a `.StartInfo`, and ``text`` is a `str` with local connection status. """ c_state = ct.c_int32() c_start_info = ct.c_int32() c_text = ct.create_string_buffer(128) dll.kvrDeviceGetServiceStatus(self.device_info, c_state, c_start_info) dll.kvrDeviceGetServiceStatusText(self.device_info, c_text, ct.sizeof(c_text)) # python 2 requires converting from long to int state = ServiceState(int(c_state.value)) start_info = StartInfo(int(c_start_info.value)) text = c_text.value return ServiceStatus(state, start_info, text) @property def stored(self): """`bool`: Whether this `DeviceInfo` was read from the registry""" return self.device_info.availability is Availability.STORED @property def usage(self): """`~canlib.kvrlib.DeviceUsage`: Usage status (Free/Remote/USB/Config)""" return DeviceUsage(self.device_info.usage)
[docs] def info(self): """Create a string with information about the `DeviceInfo` To be used when the ``str()`` representation is not detailed enough. """ attrs = ( 'ean', 'serial', 'name', 'hostname', 'connect', 'firmware_version', 'device_address', 'client_address', 'base_station_id', 'usage', 'accessibility', 'availability', ) info = '\n'.join(format(attr, '<20') + str(getattr(self, attr)) for attr in attrs) return info
[docs] def update(self, other): """Update the attributes of this instance This function takes a `dict`, and will set the attributes given by the keys to the corresponding value (on this object). """ for name, val in other.items(): setattr(self, name, val)