import ctypes as ct
import struct
import weakref
from collections import namedtuple
from time import sleep
from .. import deprecation
from ..device import Device
from ..frame import Frame
from . import constants as const
from . import iopin, wrapper
from .busparams import BusParamsTq
from .channeldata import HandleData
from .enums import (DeviceMode, Driver, MessageFlag, Open,
ScriptRequest, ScriptStatus, ScriptStop, Stat)
from .envvar import EnvVar
from .exceptions import CanError
from .iocontrol import IOControl
from .structures import CanBusParamsTq, CanBusStatistics
dll = wrapper.dll
ErrorCounters = namedtuple('ErrorCounters', 'tx rx overrun')
"""Error counters returned by `.Channel.read_error_counters`."""
# canBITRATE_XXX and canFD_BITRATE_XXX constants are still supported.
# However, they are deprecated and as such not mentioned in the docstring.
[docs]def openChannel(channel, flags=0, bitrate=None, data_bitrate=None):
"""Open CAN channel
Retrieves a `Channel` object for the given CANlib channel number using
the supplied flags.
Example usage:
>>> bitrate = canlib.busparams.BusParamsTq(
... tq=40,
... phase1=5,
... phase2=6,
... sjw=4,
... prescaler=2,
... prop=28,
... )
>>> data_bitrate = canlib.busparams.BusParamsTq(
... tq=40,
... phase1=31,
... phase2=8,
... sjw=2,
... prescaler=2,
... )
>>> ch = canlib.openChannel(
... channel=0,
... flags=canlib.Open.CAN_FD,
... bitrate=bitrate,
... data_bitrate=data_bitrate,
... )
Note:
If both `bitrate` and `data_bitrate` is given, both must be of the same
type, i.e. either BusParamsTq or BITRATE flags. It is not supported to
mix the two.
Args:
channel (`int`): CANlib channel number
flags (`int`): Flags, a combination of the `~canlib.canlib.Open` flag values.
Default is zero, i.e. no flags.
bitrate (`~busparams.BusParamsTq` or `~canlib.canlib.Bitrate` or `~canlib.canlib.BitrateFD`):
The desired bitrate. If the bitrate is not a
`~busparams.BusParamsTq`, the predefined `~canlib.canlib.Bitrate`
values are used for classic CAN and `~canlib.canlib.BitrateFD`
values are used for CAN FD, e.g. `canlib.Bitrate.BITRATE_1M` and
`canlib.BitrateFD.BITRATE_1M_80P`. For CAN FD, this parameter gives the
arbitration bitrate.
data_bitrate (`~.busparams.BusParamsTq` or `~canlib.canlib.BitrateFD`):
The desired data bitrate for CAN FD. If not `~busparams.BusParamsTq` is
given, the predefined `~canlib.canlib.BitrateFD` values are used, e.g.
`canlib.BitrateFD.BITRATE_500K_80p`. This parameter is only used
when opening a CAN FD channel.
Returns:
A `Channel` object created with channel and flags
.. versionadded:: 1.6
The *bitrate* and *data_bitrate* arguments was added.
.. versionchanged:: 1.16
The *bitrate* and *data_bitrate* arguments now accept `~busparams.BusParamsTq` objects.
.. versionchanged:: 1.17
Added support for `~canlib.canlib.Bitrate` and `~canlib.canlib.BitrateFD` enumerations.
"""
ch = Channel(channel, flags)
if not ch.is_can_fd() and data_bitrate is not None:
ch.close()
raise TypeError("data_bitrate requires CAN FD flag.")
if bitrate is not None:
if ch.is_can_fd() and data_bitrate is None:
ch.close()
raise ValueError("data_bitrate must be specified for CAN FD channels.")
if isinstance(bitrate, BusParamsTq):
ch.set_bus_params_tq(bitrate, data_bitrate)
else:
ch.setBusParams(bitrate)
if ch.is_can_fd():
ch.setBusParamsFd(data_bitrate)
return ch
[docs]class ScriptText(str):
"""Text returned by `Channel.scriptGetText`
Subclass of built-in `str`, so it can be used just like a normal string.
It also has the following attributes:
Args:
text (`str`): Text content.
slot (`int`): Which script-slot the text came from.
time (`int`): Timestamp of when the text was printed.
flags (`~canlib.canlib.Stat`): Any status flags associated with the text.
.. versionadded:: 1.7
"""
def __new__(cls, text, slot, time, flags):
obj = super().__new__(cls, text)
obj.slot = slot
obj.time = time
obj.flags = flags
return obj
[docs]class Channel:
"""Helper class that represents a CANlib channel.
This class wraps the canlib class and tries to implement a more Pythonic
interface to CANlib.
Channels are automatically closed on garbage collection, and can
also be used as context managers in which case they close as soon as the
context exits.
Attributes:
envvar (`.EnvVar`): Used to access *t* program environment variables
"""
_iocontrol_ref = lambda self: None # noqa
_handledata_ref = lambda self: None # noqa
_MAX_MSG_SIZE = 64
_bus_params_tq_err_msg = (
"set_bus_params_tq() and get_bus_params_tq() are not "
"supported on your device. Use setBusParams() and getBusParams() "
"instead, or try upgrading your firmware.\n\n"
"Product: {}\n"
"Firmware version: {}.{}.{}"
)
def __init__(self, channel_number, flags=0):
self.index = channel_number
# Initialize handle here in case canOpenChannel triggers an exception.
self.handle = -1
if flags == 0:
self.flags = Open.NOFLAG
else:
self.flags = flags
self.handle = dll.canOpenChannel(channel_number, flags)
self.envvar = EnvVar(self)
self._device = None
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def __del__(self):
try:
self.close()
except CanError:
# For some reason, in python 2 we get "Handle is invalid (-10)
# here. Probably something to do with the unpredictable way
# __del__ is called.
pass
def _calculate_length(self, dlc, message_flags):
if message_flags & MessageFlag.FDF:
length = dlc
else:
length = min(8, dlc)
return length
[docs] def announceIdentityEx(self, item, value):
"""AnnounceIdentityEx function is used by certain OEM applications.
.. versionadded:: 1.17
"""
c_value = ct.c_int(value)
dll.kvAnnounceIdentityEx(self.handle, item, ct.byref(c_value), ct.sizeof(c_value))
@property
def iocontrol(self):
"""`IOControl`: ``canIoCtl`` helper object for this channel
See the documentation for `IOControl` for how it can be used to perform
all functionality of the C function ``canIoCtl``.
Example usage:
>>> from canlib import canlib
>>> ch = canlib.openChannel(channel=0)
>>> ch.iocontrol.timer_scale
1000
>>> ch.iocontrol.local_txecho = False
"""
# Because handles are only closed on garbage collection, a weak
# reference is used to prevent a cyclic reference that would delay
# garbage collection (in CPython). As a side effect, the iocontrol
# objects are optimized to only stay alive while someone needs them.
ioc = self._iocontrol_ref()
if ioc is None:
ioc = IOControl(self)
self._iocontrol_ref = weakref.ref(ioc)
return ioc
@property
def channel_data(self):
"""`~.channeldata.HandleData`: ``canGetHandleData`` helper object for this channel
See the documentation for `ChannelData`/`~.channeldata.HandleData` for how it can be
used to perform all functionality of the C function
``canGetHandleData``.
.. versionadded:: 1.13
"""
# Because handles are only closed on garbage collection, a weak
# reference is used to prevent a cyclic reference that would delay
# garbage collection (in CPython). As a side effect, the handledata
# objects are optimized to only stay alive while someone needs them.
chd = self._handledata_ref()
if chd is None:
chd = HandleData(self)
self._handledata_ref = weakref.ref(chd)
return chd
[docs] def close(self):
"""Close CANlib channel
Closes the channel associated with the handle. If no other threads are
using the CAN circuit, it is taken off bus.
Note:
It is normally not necessary to call this function directly, as the
internal handle is automatically closed when the `Channel` object
is garbage collected.
"""
if self.handle != -1:
dll.canClose(self.handle)
self.handle = -1
[docs] def canAccept(self, envelope, flag):
"""Set acceptance filters mask or code.
This routine sets the message acceptance filters on a CAN channel.
Setting flag to `AcceptFilterFlag.NULL_MASK` (0) removes the filter.
Note that not all CAN boards support different masks for standard and
extended CAN identifiers.
Args:
envelope: The mask or code to set.
flag: Any of `AcceptFilterFlag`
"""
dll.canAccept(self.handle, envelope, flag)
[docs] def canSetAcceptanceFilter(self, code, mask, is_extended=False):
"""Set message acceptance filter.
This routine sets the message acceptance filters on a CAN channel.
The message is accepted `if id AND mask == code` (this is actually
imlepemented as `if ((code XOR id) AND mask) == 0`).
Using standard 11-bit CAN identifiers and setting
* mask = 0x7f0,
* code = 0x080
accepts CAN messages with standard id 0x080 to 0x08f.
Setting the *mask* to canFILTER_NULL_MASK (0) removes the filter.
Note that not all CAN boards support different masks for standard and
extended CAN identifiers.
Args:
mask (`int`): A bit mask that indicates relevant bits with '1'.
code (`int`): The expected state of the masked bits.
is_extended (`bool`): If `True`, both mask and code applies
to 29-bit CAN identifiers.
"""
dll.canSetAcceptanceFilter(self.handle, code, mask, is_extended)
[docs] def device(self):
"""Get a Device object from the current channel
Returns:
`~canlib.Device`: Device used by this channel,
.. versionadded:: 1.16
"""
if self._device is None:
chd = self.channel_data
ean = chd.card_upc_no
serial = chd.card_serial_no
self._device = Device(ean=ean, serial=serial)
return self._device
[docs] def setBusParams(self, freq, tseg1=0, tseg2=0, sjw=0, noSamp=0, syncmode=0):
"""Set bus timing parameters for classic CAN
This function sets the bus timing parameters for the specified CAN
controller.
The library provides default values for tseg1, tseg2, sjw and noSamp
when freq is a `~canlib.canlib.Bitrate`, e.g. `.Bitrate.BITRATE_1M`.
If freq is any other value, no default values are supplied by the
library.
If you are using multiple handles to the same physical channel, for
example if you are implementing a multithreaded application, you must
call `~.Channel.busOn()` once for each handle. The same applies to
`~.Channel.busOff()` - the physical channel will not go off bus until
the last handle to the channel goes off bus.
Args:
freq: Bitrate in bit/s.
tseg1: Number of quanta from (but not including) the Sync Segment
to the sampling point.
tseg2: Number of quanta from the sampling point to the end of
the bit.
sjw: The Synchronization Jump Width, can be 1, 2, 3, or 4.
nosamp: The number of sampling points, only 1 is supported.
syncMode: Unsupported and ignored.
.. versionchanged:: 1.17
Now accepts `~canlib.canlib.Bitrate` enumerations.
"""
dll.canSetBusParams(self.handle, freq, tseg1, tseg2, sjw, noSamp, syncmode)
[docs] def getBusParams(self):
"""Get bus timing parameters for classic CAN.
This function retrieves the current bus parameters for the specified
channel.
Returns: A tuple containing:
*freq*: Bitrate in bit/s.
*tseg1*: Number of quanta from but not including the Sync
Segment to the sampling point.
*tseg2*: Number of quanta from the sampling point to the
end of the bit.
*sjw*: The Synchronization Jump Width, can be 1, 2, 3, or 4.
*noSamp*: The number of sampling points, only 1 is supported.
*syncmode*: Unsupported, always read as zero.
"""
freq = ct.c_long()
tseg1 = ct.c_uint()
tseg2 = ct.c_uint()
sjw = ct.c_uint()
noSamp = ct.c_uint()
syncmode = ct.c_uint()
dll.canGetBusParams(
self.handle,
ct.byref(freq),
ct.byref(tseg1),
ct.byref(tseg2),
ct.byref(sjw),
ct.byref(noSamp),
ct.byref(syncmode),
)
return (
freq.value,
tseg1.value,
tseg2.value,
sjw.value,
noSamp.value,
syncmode.value,
)
[docs] def setBusParamsFd(self, freq_brs, tseg1_brs=0, tseg2_brs=0, sjw_brs=0):
"""Set bus timing parameters for BRS in CAN FD
This function sets the bus timing parameters used in BRS (Bit rate
switch) mode for the current CANlib channel.
The library provides default values for tseg1_brs, tseg2_brs and
sjw_brs when freq is a `~canlib.canlib.BitrateFD` value, e.g.
`.BitrateFD.BITRATE_1M_80P`.
If freq is any other value, no default values are supplied by the
library.
For finding out whether a channel is opened using `~Open.CAN_FD`
or `~Open.CAN_FD_NONISO` flags, see `~Channel.is_can_fd`
Args:
freq_brs: Bitrate in bit/s.
tseg1_brs: Number of quanta from (but not including) the Sync
Segment to the sampling point.
tseg2_brs: Number of quanta from the sampling point to the
end of the bit.
sjw_brs: The Synchronization Jump Width.
.. versionchanged:: 1.17
Now accepts `~canlib.canlib.BitrateFD` enumerations.
"""
if not self.is_can_fd():
raise TypeError(
"setBusParamsFd() is only supported on channels "
"opened with the CAN_FD or CAN_FD_NONISO flags."
)
dll.canSetBusParamsFd(self.handle, freq_brs, tseg1_brs, tseg2_brs, sjw_brs)
[docs] def getBusParamsFd(self):
"""Get bus timing parameters for BRS in CAN FD.
This function retrieves the bus current timing parameters used in BRS
(Bit rate switch) mode for the current CANlib channel.
The library provides default values for tseg1_brs, tseg2_brs and
sjw_brs when freq is a `~canlib.canlib.BitrateFD` value.
If freq is any other value, no default values are supplied by the
library.
For finding out whether a channel is opened using `~Open.CAN_FD`
or `~Open.CAN_FD_NONISO` flags, see `~Channel.is_can_fd()`
Returns: A tuple containing:
*freq_brs*: Bitrate in bit/s.
*tseg1_brs*: Number of quanta from (but not including) the Sync
Segment to the sampling point.
*tseg2_brs*: Number of quanta from the sampling point to the
end of the bit.
*sjw_brs*: The Synchronization Jump Width.
"""
if not self.is_can_fd():
raise TypeError(
"getBusParamsFd() is only supported on channels "
"opened with the CAN_FD or CAN_FD_NONISO flags."
)
freq_brs = ct.c_long()
tseg1_brs = ct.c_uint()
tseg2_brs = ct.c_uint()
sjw_brs = ct.c_uint()
dll.canGetBusParamsFd(
self.handle,
ct.byref(freq_brs),
ct.byref(tseg1_brs),
ct.byref(tseg2_brs),
ct.byref(sjw_brs),
)
return (freq_brs.value, tseg1_brs.value, tseg2_brs.value, sjw_brs.value)
[docs] def set_bus_params_tq(self, nominal, data=None):
"""Set bus timing parameters, using time quanta
This function sets the bus timing parameters for the specified CAN
controller. When setting bus parameters for CAN FD, both `nominal` and
`data` must be given.
If you are using multiple handles to the same physical channel, for
example if you are implementing a multithreaded application, you must
call `busOff()` once for each handle. The physical channel will not go
off bus until the last handle to the channel goes off bus.
The same applies to `busOn()`.
Args:
nominal (`~canlib.canlib.busparams.BusParamsTq`): Nominal Bus
timing parameters, also used for classic CAN
data (`~canlib.canlib.busparams.BusParamsTq`): Bus timing parameters
for data rate in CAN FD.
.. versionadded:: 1.16
"""
if self.is_can_fd():
if data is None:
raise ValueError("data must be specified for CAN FD channels.")
else:
dll.canSetBusParamsFdTq(
self.handle,
CanBusParamsTq._frombusparamstq(nominal),
CanBusParamsTq._frombusparamstq(data),
)
else:
dll.canSetBusParamsTq(
self.handle, CanBusParamsTq._frombusparamstq(nominal)
)
[docs] def get_bus_params_tq(self):
"""Get bus timing parameters, in time quanta
This function retrieves the current bus parameters for the specified
channel. Only the returned `nominal` parameter is valid when classic CAN
is in use.
Returns: A `tuple` containing:
*nominal*: (`~canlib.canlib.busparams.BusParamsTq`) Nominal bus
timing parameters, also used in classic CAN.
*data*: (`~canlib.canlib.busparams.BusParamsTq`) Bus timing
parameters for data rate in CAN FD.
.. versionadded:: 1.16
"""
nominal = CanBusParamsTq()
if self.is_can_fd():
data = CanBusParamsTq()
dll.canGetBusParamsFdTq(self.handle, ct.byref(nominal), ct.byref(data))
return (
nominal._tobusparamstq(),
data._tobusparamstq(),
)
else:
dll.canGetBusParamsTq(self.handle, ct.byref(nominal))
return (
nominal._tobusparamstq(),
None,
)
[docs] def bitrate_to_BusParamsTq(self, freq_a, freq_d=None):
"""Calculate bus parameters based on predefined frequencies.
This function uses the `~canlib.canlib.Bitrate` and
`~canlib.canlib.BitrateFD` values to create bus parameters for use with
the new bus parameter API.
Args:
freq_a (`~canlib.canlib.Bitrate` or `~canlib.canlib.BitrateFD`): The bitrate for classic CAN channels specified as a `~canlib.canlib.Bitrate`, or the arbitration bitrate for CAN FD channels specified as a `~canlib.canlib.BitrateFD`.
freq_d (`~canlib.canlib.BitrateFD`): Data bitrate for CAN FD channels.
Returns:
(`nominal`, `None`) for classic CAN, where `nominal` is a `~busparams.BusParamsTq` object.
(`nominal`, `data`) for CAN FD, where both `nominal` and `data` are `~busparams.BusParamsTq` objects, representing the arbitration bitrate and the data bitrate, respectively.
.. versionadded:: 1.17
"""
if self.is_can_fd() and freq_d is None:
raise ValueError("Data bitrate must be specified for CAN FD channels.")
nominal = CanBusParamsTq()
if self.is_can_fd():
data = CanBusParamsTq()
dll.kvBitrateToBusParamsFdTq(
self.handle,
freq_a,
freq_d,
ct.byref(nominal),
ct.byref(data))
return (nominal._tobusparamstq(), data._tobusparamstq())
else:
dll.kvBitrateToBusParamsTq(
self.handle,
freq_a,
ct.byref(nominal))
return (nominal._tobusparamstq(), None)
[docs] def busOn(self):
"""Takes the specified channel on-bus.
If you are using multiple handles to the same physical channel, for
example if you are implementing a multithreaded application, you must
call busOn() once for each handle.
"""
dll.canBusOn(self.handle)
[docs] def busOff(self):
"""Takes the specified channel off-bus.
Closes the channel associated with the handle. If no other threads are
using the CAN circuit, it is taken off bus. The handle can not be used
for further references to the channel.
"""
dll.canBusOff(self.handle)
def _request_bus_statistics(self):
dll.canRequestBusStatistics(self.handle)
def _get_bus_statistics(self):
statistics = CanBusStatistics()
dll.canGetBusStatistics(self.handle, ct.byref(statistics), ct.sizeof(statistics))
return statistics
[docs] def get_bus_statistics(self):
"""Return bus statistics.
Returns:
`structures.CanBusStatistics`
"""
statistics = CanBusStatistics()
dll.canRequestBusStatistics(self.handle)
sleep(0.01) # we need to wait for the statistics to be available in windows.
dll.canGetBusStatistics(self.handle, ct.byref(statistics), ct.sizeof(statistics))
return statistics
[docs] def getBusOutputControl(self):
"""Get driver type
This function retrieves the current CAN controller driver type. This
corresponds loosely to the bus output control register in the CAN
controller, hence the name of this function.
Note:
Not all CAN driver types are supported on all cards.
Returns:
drivertype (`.canlib.Driver`): Driver type to set.
.. versionadded:: 1.11
"""
driver_type = ct.c_uint()
dll.canGetBusOutputControl(self.handle, driver_type)
return Driver(driver_type.value)
# qqdaca v1.5 This function is complicated by the fact that previously it
# had the signature of write_wait, and we still want to support that for
# one more version at least.
#
# It would be preferable to merge `write` and `writeWait` by giving `write`
# an optional `wait` argument. If specified, it would make the function
# call `canWriteWait` with ``timeout=wait``. However, that would complicate
# the function too much if the old signature is still to be supported.
#
# If this merger is done in the future, also add the same argument to
# linlib's `writeMessage` and `requestMessage` functions.
[docs] def write(self, frame=None, *args, **kwargs):
"""Send a CAN message.
This function sends a Frame object as a CAN message. Note that the
message has been queued for transmission when this calls return. It has
not necessarily been sent.
Note:
If you are using the same channel via multiple handles, the default
behaviour is that the different handles will "hear" each other just
as if each handle referred to a channel of its own. If you open,
say, channel 0 from thread A and thread B and then send a message
from thread A, it will be "received" by thread B.
This behaviour can be changed by setting `local_txecho` to `False`
(using `~canlib.canlib.IOControl`):
>>> from canlib import canlib
>>> ch = canlib.openChannel(channel=0)
>>> ch.iocontrol.local_txecho = False
Also see `Channel.write_raw` for sending messages without constructing
`canlib.Frame` objects.
.. deprecated:: 1.5
Sending the `canlib.Frame` contents as separate arguments; this
functionality has been taken over by `write_raw`.
Args:
frame (`canlib.Frame`)
"""
if len(args) == 0 and len(kwargs) == 0:
dll.canWrite(self.handle, frame.id, bytes(frame.data), frame.dlc, frame.flags)
else:
deprecation.manual_warn(
"Calling Channel.write() with individual arguments is deprecated, "
"please use a Frame object or Channel.write_raw()"
)
if frame is None:
return self.write_raw(*args, **kwargs)
else:
return self.write_raw(frame, *args, **kwargs)
[docs] def write_raw(self, id_, msg, flag=0, dlc=None):
"""Send a CAN message
See docstring of `Channel.write` for general information about sending
CAN messages.
The variable name id (as used by canlib) is a built-in function in
Python, so the name `id_` is used instead.
Args:
id_: The identifier of the CAN message to send.
msg: An array or bytearray of the message data
flag: A combination of `~canlib.canlib.MessageFlag`. Use this
parameter e.g. to send extended (29-bit) frames.
dlc: The length of the message in bytes. For Classic CAN dlc can
be at most 8, unless `.Open.ACCEPT_LARGE_DLC` is
used. For CAN FD dlc can be one of the following 0-8, 12, 16,
20, 24, 32, 48, 64. Optional, if omitted, *dlc* is calculated
from the *msg* array.
""" # noqa: RST306
if not isinstance(msg, (bytes, str)):
if not isinstance(msg, bytearray):
msg = bytearray(msg)
msg = bytes(msg)
if dlc is None:
dlc = len(msg)
dll.canWrite(self.handle, id_, msg, dlc, flag)
[docs] def writeSync(self, timeout):
"""Wait for queued messages to be sent
Waits until all CAN messages for the specified handle are sent, or the
timeout period expires.
Args:
timeout (`int`): The timeout in milliseconds, `None` or ``0xFFFFFFFF`` for
an infinite timeout.
"""
if timeout is None:
timeout = 0xFFFFFFFF
dll.canWriteSync(self.handle, timeout)
[docs] def writeWait(self, frame, timeout, *args, **kwargs):
"""Sends a CAN message and waits for it to be sent.
This function sends a CAN message. It returns when the message is sent,
or the timeout expires. This is a convenience function that combines
write() and writeSync().
.. deprecated:: 1.5
Sending the `Frame` contents as separate arguments; this
functionality has been taken over by `writeWait_raw`.
Args:
frame (`canlib.Frame`) : Frame containing the CAN data to be sent
timeout (`int`) : The timeout, in milliseconds. 0xFFFFFFFF gives an infinite
timeout.
"""
if len(args) == 0 and len(kwargs) == 0:
dll.canWriteWait(
self.handle, frame.id, bytes(frame.data), frame.dlc, frame.flags, timeout
)
else:
deprecation.manual_warn(
"Calling Channel.writeWait() with individual arguments is deprecated, "
"please use a Frame object or Channel.writeWait_raw()"
)
return self.writeWait_raw(frame, timeout, *args, **kwargs)
[docs] def writeWait_raw(self, id_, msg, flag=0, dlc=0, timeout=0):
"""Sends a CAN message and waits for it to be sent.
This function sends a CAN message. It returns when the message is sent,
or the timeout expires. This is a convenience function that combines
write() and writeSync().
Args:
id_: The identifier of the CAN message to send.
msg: An array or bytearray of the message data
flag: A combination of `canlib.canlib.MessageFlag`. Use this
parameter e.g. to send extended (29-bit) frames.
dlc: The length of the message in bytes. For Classic CAN dlc can
be at most 8, unless `canlib.canlib.Open.ACCEPT_LARGE_DLC` is
used. For CAN FD dlc can be one of the following 0-8, 12, 16,
20, 24, 32, 48, 64. Optional, if omitted, dlc is calculated
from the msg array.
timeout: The timeout, in milliseconds. 0xFFFFFFFF gives an infinite
timeout.
""" # noqa: RST306
if not isinstance(msg, (bytes, str)):
if not isinstance(msg, bytearray):
msg = bytearray(msg)
msg = bytes(msg)
if dlc == 0:
dlc = len(msg)
dll.canWriteWait(self.handle, id_, msg, dlc, flag, timeout)
[docs] def readTimer(self):
"""Read the hardware clock on the specified device
Returns the time value.
"""
time = ct.c_int()
dll.kvReadTimer(self.handle, time)
return time.value
[docs] def read(self, timeout=0):
"""Read a CAN message and metadata.
Reads a message from the receive buffer. If no message is available,
the function waits until a message arrives or a timeout occurs.
Note:
If you are using the same channel via multiple handles, the default
behaviour is that the different handles will "hear" each other just
as if each handle referred to a channel of its own. If you open,
say, channel 0 from thread A and thread B and then send a message
from thread A, it will be "received" by thread B.
This behaviour can be changed by setting `local_txecho` to `False`
(using `~canlib.canlib.IOControl`):
>>> from canlib import canlib
>>> ch = canlib.openChannel(channel=0)
>>> ch.iocontrol.local_txecho = False
Args:
timeout (`int`): Timeout in milliseconds, -1 gives an
infinite timeout.
Returns:
`canlib.Frame`
Raises:
`~canlib.canlib.CanNoMsg`: No CAN message is currently available.
"""
# msg will be replaced by class when CAN FD is supported
msg = ct.create_string_buffer(self._MAX_MSG_SIZE)
id_ = ct.c_long()
dlc = ct.c_uint()
flag = ct.c_uint()
time = ct.c_ulong()
dll.canReadWait(self.handle, id_, msg, dlc, flag, time, timeout)
try:
flags = MessageFlag(flag.value)
# In Python 2 this will fail because the value is a C long, and the
# enums only accept int.
except ValueError:
flags = MessageFlag(int(flag.value))
length = self._calculate_length(dlc.value, flags)
return Frame(
id_=id_.value,
data=bytearray(msg.raw[:length]),
dlc=dlc.value,
flags=flags,
timestamp=time.value,
)
[docs] def readDeviceCustomerData(self, userNumber=100, itemNumber=0):
"""Read customer data stored in device"""
buf = ct.create_string_buffer(8)
user = ct.c_int(userNumber)
item = ct.c_int(itemNumber)
dll.kvReadDeviceCustomerData(self.handle, user, item, buf, ct.sizeof(buf))
return struct.unpack('!Q', buf)[0]
[docs] def readSpecificSkip(self, id_):
"""Read a message with specified identifier
Reads a message with a specified identifier from the receive
buffer. Any preceding message not matching the specified identifier
will be removed in the receive buffer. If no message with the specified
identifier is available, the function returns immediately with an error
code.
Returns:
`canlib.Frame`
"""
# msg will be replaced by class when CAN FD is supported
msg = ct.create_string_buffer(self._MAX_MSG_SIZE)
id_ = ct.c_long(id_)
dlc = ct.c_uint()
flag = ct.c_uint()
time = ct.c_ulong()
dll.canReadSpecificSkip(self.handle, id_, msg, dlc, flag, time)
try:
flags = MessageFlag(flag.value)
# In Python 2 this will fail because the value is a long, and the
# enums only accept int.
except ValueError:
flags = MessageFlag(int(flag.value))
length = self._calculate_length(dlc.value, flags)
return Frame(
id_=id_.value,
data=bytearray(msg.raw[:length]),
dlc=dlc.value,
flags=flags,
timestamp=time.value,
)
[docs] def requestChipStatus(self):
"""Request chip status messages
Requests that the hardware report the chip status (bus on/error passive
status etc.) to the driver. The chip status can later be retrieved
using `canlib.Channel.readStatus`.
Note:
The `requestChipStatus` function is asynchronous, that is, it
completes before the answer is returned from the hardware. The
time between a call to `requestChipStatus` and the point in time
where the chip status is actually available via a call to
`.Channel.readStatus` is not defined. The
`.Channel.readStatus` always returns the latest data reported
by the hardware.
"""
dll.canRequestChipStatus(self.handle)
[docs] def readStatus(self):
"""Return status for the current channel
Returns the latest status reported by the hardware in a combination of
the flags `~canlib.canlib.Stat` (bus on/error passive + status etc).
Returns:
`canlib.canlib.Stat`
"""
flags = ct.c_ulong(0)
dll.canReadStatus(self.handle, ct.byref(flags))
return Stat(flags.value)
[docs] def readSyncSpecific(self, id_, timeout=0):
"""Wait until the receive queue contains a message with the specified id"""
id_ = ct.c_long(id_)
dll.canReadSyncSpecific(self.handle, id_, timeout)
[docs] def read_error_counters(self):
"""Read the error counters of the CAN controller
Returns the latest known values of the error counters in the specified
circuit. If the error counters change values precisely when
`read_error_counters` is called, it may not be reflected in the
returned result.
Use `.clear_error_counters` via `.Channel.iocontrol` to clear
the counters.
Returns:
The returned tuple is a ``(rx, tx, overrun)`` named tuple of:
#. ``rx`` (`int`): Receive error counter
#. ``tx`` (`int`): Transmit error counter
#. ``overrun`` (`int`): Number of overrun errors.
.. versionadded:: 1.11
"""
txErr = ct.c_int()
rxErr = ct.c_int()
ovErr = ct.c_int()
dll.canReadErrorCounters(self.handle, ct.byref(txErr), ct.byref(rxErr), ct.byref(ovErr))
error_counters = ErrorCounters(
tx=txErr.value,
rx=rxErr.value,
overrun=ovErr.value,
)
return error_counters
[docs] def scriptSendEvent(
self, slotNo=0, eventType=const.kvEVENT_TYPE_KEY, eventNo=None, data=0
):
"""Send specified event to specified t script
Send an event with given type, event number, and associated data to a
script running in a specific slot.
"""
if eventNo is None:
ord('a')
dll.kvScriptSendEvent(
self.handle, ct.c_int(slotNo), ct.c_int(eventType), ct.c_int(eventNo), ct.c_uint(data)
)
[docs] def setBusOutputControl(self, drivertype=Driver.NORMAL):
"""Set driver type
This function sets the driver type for a CAN controller to e.g. silent
mode. This corresponds loosely to the bus output control register in
the CAN controller, hence the name of this function.
Note:
Not all CAN driver types are supported on all cards.
Args:
drivertype (`.canlib.Driver`): Driver type to set.
"""
dll.canSetBusOutputControl(self.handle, drivertype)
[docs] @deprecation.deprecated.favour(".iocontrol.flush_rx_buffer()")
def ioCtl_flush_rx_buffer(self):
"""Deprecated function
.. deprecated:: 1.5
Use `IOControl`; ``Channel.iocontrol.flush_rx_buffer()``.
"""
dll.canIoCtl(self.handle, const.canIOCTL_FLUSH_RX_BUFFER, None, 0)
[docs] @deprecation.deprecated.favour("'Channel.iocontrol.timer_scale = scale'")
def ioCtl_set_timer_scale(self, scale):
"""Deprecated function
.. deprecated:: 1.5
Use `IOControl`; ``Channel.iocontrol.timer_scale = scale``
"""
scale = ct.c_long(scale)
dll.canIoCtl(
self.handle, const.canIOCTL_SET_TIMER_SCALE, ct.byref(scale), ct.sizeof(scale)
)
[docs] @deprecation.deprecated.favour("'Channel.iocontrol.report_access_errors' attribute")
def ioCtl_get_report_access_errors(self):
"""Deprecated function
.. deprecated:: 1.5
Use `IOControl`; ``Channel.iocontrol.report_access_errors``
"""
buf = ct.c_ubyte()
dll.canIoCtl(
self.handle, const.canIOCTL_GET_REPORT_ACCESS_ERRORS, ct.byref(buf), ct.sizeof(buf)
)
return buf.value
[docs] @deprecation.deprecated.favour("'Channel.iocontrol.report_access_errors = on'")
def ioCtl_set_report_access_errors(self, on=0):
"""Deprecated function
.. deprecated:: 1.5
Use `IOControl`; ``Channel.iocontrol.report_access_errors = on``
"""
buf = ct.c_ubyte(on)
dll.canIoCtl(
self.handle, const.canIOCTL_SET_REPORT_ACCESS_ERRORS, ct.byref(buf), ct.sizeof(buf)
)
[docs] def flashLeds(self, action, timeout_ms):
"""Turn Leds on or off.
Args:
action (`int`): One of `~canlib.canlib.LEDAction`, defining
which LED to turn on or off.
timeout_ms (`int`): Specifies the time, in milliseconds, during which
the action is to be carried out. When the timeout
expires, the LED(s) will return to its ordinary
function.
"""
dll.kvFlashLeds(self.handle, action, timeout_ms)
[docs] @deprecation.deprecated.favour("ChannelData.device_name")
def getChannelData_Name(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.device_name``
"""
return wrapper.getChannelData_Name(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.custom_name")
def getChannelData_Cust_Name(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.custom_name``
"""
try:
return wrapper.getChannelData_Cust_Name(self.index)
except CanError:
None
return ""
return ""
[docs] @deprecation.deprecated.favour("ChannelData.chan_no_on_card")
def getChannelData_Chan_No_On_Card(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.chan_no_on_card``
"""
return wrapper.getChannelData_Chan_No_On_Card(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.card_number")
def getChannelData_CardNumber(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.card_number``
"""
return wrapper.getChannelData_CardNumber(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.card_upc_no")
def getChannelData_EAN(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.card_upc_no``
"""
return wrapper.getChannelData_EAN(self.index)
[docs] @deprecation.deprecated
def getChannelData_EAN_short(self):
return wrapper.getChannelData_EAN_short(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.card_serial_no")
def getChannelData_Serial(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.card_serial_no``
"""
return wrapper.getChannelData_Serial(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.driver_name")
def getChannelData_DriverName(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.driver_name``
"""
return wrapper.getChannelData_DriverName(self.index)
[docs] @deprecation.deprecated.favour("ChannelData.card_firmware_rev")
def getChannelData_Firmware(self):
"""Deprecated function
.. deprecated:: 1.5
Use `ChannelData`; ``ChannelData.card_firmware_rev``
"""
return wrapper.getChannelData_Firmware(self.index)
[docs] def scriptStart(self, slot):
"""Start previously loaded script in specified slot"""
dll.kvScriptStart(self.handle, slot)
[docs] def scriptStatus(self, slot):
"""Retreives t program status for selected slot
Args:
slot (`int`) : Slot number to be queried
Returns:
`canlib.ScriptStatus`
.. versionadded:: 1.6
"""
status = ct.c_uint()
dll.kvScriptStatus(self.handle, slot, ct.byref(status))
return ScriptStatus(status.value)
[docs] def scriptStop(self, slot, mode=ScriptStop.NORMAL):
"""Stop script running in specified slot
Args:
slot (`int`) : slot containing the running script we want to stop.
mode (`canlib.ScriptStop`) : Default mode is `canlib.ScriptStop.NORMAL`.
"""
dll.kvScriptStop(self.handle, slot, mode)
[docs] def scriptUnload(self, slot):
"""Unload previously stopped script in specified slot"""
dll.kvScriptUnload(self.handle, slot)
[docs] def scriptLoadFileOnDevice(self, slot, localFile):
"""Load compiled, locally stored, script file
Loads a compiled script file (.txe) stored locally on the device (SD
card) into a script slot on the device. The scripts default channel
will be the same channel used when this Channel object was created.
Args:
slot (`int`) : slot containing the running script we want to stop.
localFile (`str`): Name of compiled script (.txe) to load.
"""
dll.kvScriptLoadFileOnDevice(self.handle, slot, ct.c_char_p(localFile))
[docs] def scriptLoadFile(self, slot, filePathOnPC):
"""Load compiled script file from host(PC)
Loads a compiled script file (.txe) stored on the host (PC) into a
script slot on the device. The scripts default channel will be the same
channel used when this Channel object was created.
Args:
slot (`int`) : slot containing the running script we want to stop.
filePathOnPC (`str`): Path to compiled script (.txe) to load.
"""
c_filePathOnPC = ct.c_char_p(filePathOnPC.encode('utf-8'))
dll.kvScriptLoadFile(self.handle, slot, c_filePathOnPC)
[docs] def scriptEnvvarOpen(self, name):
"""Low level function to open an Envvar
This should normally not be used directly, instead opening and closing
of an envvar is automatically done when accessing via the `EnvVar`
class through `Channel.envvar`
"""
envvarType = ct.c_int()
envvarSize = ct.c_int()
envvarName = ct.create_string_buffer(name.encode('utf-8'))
envHandle = dll.kvScriptEnvvarOpen(
self.handle, envvarName, ct.byref(envvarType), ct.byref(envvarSize)
)
return envHandle, envvarType.value, envvarSize.value
[docs] def scriptEnvvarClose(self, envHandle):
"""Low level function to close an Envvar
This should normally not be used directly, instead opening and closing
of an envvar is automatically done when accessing via the `EnvVar`
class through `Channel.envvar`
"""
dll.kvScriptEnvvarClose(ct.c_int64(envHandle))
[docs] def scriptEnvvarSetInt(self, envHandle, value):
"""Low level function to set an Envvar of type int
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
"""
value = int(value)
dll.kvScriptEnvvarSetInt(ct.c_int64(envHandle), ct.c_int(value))
[docs] def scriptEnvvarGetInt(self, envHandle):
"""Low level function to read an Envvar of type int
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
"""
envvarValue = ct.c_int()
dll.kvScriptEnvvarGetInt(ct.c_int64(envHandle), ct.byref(envvarValue))
return envvarValue.value
[docs] def scriptEnvvarSetFloat(self, envHandle, value):
"""Low level function to set an Envvar of type float
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
"""
value = float(value)
dll.kvScriptEnvvarSetFloat(ct.c_int64(envHandle), ct.c_float(value))
[docs] def scriptEnvvarGetFloat(self, envHandle):
"""Low level function to read an Envvar of type float
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
"""
envvarValue = ct.c_float()
dll.kvScriptEnvvarGetFloat(ct.c_int64(envHandle), ct.byref(envvarValue))
return envvarValue.value
[docs] @deprecation.deprecated.favour(".script_envvar_set_data()")
def scriptEnvvarSetData(self, envHandle, value, envSize):
value_p = ct.create_string_buffer(value.encode('utf-8'))
dll.kvScriptEnvvarSetData(ct.c_int64(envHandle), value_p, 0, ct.c_int(envSize))
[docs] def script_envvar_set_data(self, envHandle, value, len, start=0):
"""Low level function to write a slice of an Envvar of type char
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
`value` needs to be a bytes-like object or list
.. versionadded:: 1.10
"""
if isinstance(value, list):
value = bytes(value)
char_array = ct.c_char * len
value_p = char_array.from_buffer_copy(value)
dll.kvScriptEnvvarSetData(ct.c_int64(envHandle), value_p, start, len)
[docs] @deprecation.deprecated.favour(".script_envvar_get_data()")
def scriptEnvvarGetData(self, envHandle, envSize):
envvarValue = ct.create_string_buffer(envSize)
dll.kvScriptEnvvarGetData(
ct.c_int64(envHandle), ct.byref(envvarValue), 0, ct.c_int(envSize)
)
return envvarValue.value
[docs] def script_envvar_get_data(self, envHandle, len, start=0):
# rename to length?
"""Low level function to read a slice of an Envvar of type char
This should normally not be used directly, instead set and get the
value of an envvar using the `EnvVar` class through `Channel.envvar`
.. versionadded:: 1.10
"""
char_array = (ct.c_char * len)()
dll.kvScriptEnvvarGetData(ct.c_int64(envHandle), ct.byref(char_array), start, len)
return bytes(char_array)
[docs] def scriptGetText(self):
"""Read text from subscribed script slots
Text-subscriptions must first be set up with
`Channel.scriptRequestText`.
Returns:
`ScriptText`
Raises:
`~canlib.canlib.CanNoMsg`: No more text is currently available.
.. versionadded:: 1.7
"""
slot = ct.c_int()
time = ct.c_ulong()
flags = ct.c_uint()
text = ct.create_string_buffer(1024)
dll.kvScriptGetText(
self.handle,
ct.byref(slot),
ct.byref(time),
ct.byref(flags),
text,
ct.sizeof(text),
)
return ScriptText(text.value.decode('utf-8'), slot.value, time.value, Stat(flags.value))
[docs] def scriptRequestText(self, slot, request=ScriptRequest.SUBSCRIBE):
"""Set up a printf subscription to a selected script slot
Arguments:
slot (`int`): The script slot to subscribe/unsubscribe from.
request(`~canlib.canlib.ScriptRequest`): Whether to subscribe or unsubscribe.
Text printed with ``printf()`` by a t-program that you are subscribed to
is saved and can be retrieved with `Channel.scriptGetText`.
.. versionadded:: 1.7
"""
dll.kvScriptRequestText(self.handle, slot, request)
[docs] def set_callback(self, function, event, context=None):
"""Register callback function
This will register a callback function which is called when certain
events occur. You can register at most one callback function per handle
at any time.
Note:
The callback function is called in the context of a high-priority
thread created by CANlib. You should take precaution not to do any time
consuming tasks in the callback.
Small example of usage::
# Declare callback function
def callback_func(hnd, context, event):
event = canlib.Notify(event)
# The nonlocal statement causes the listed identifiers to refer
# to previously bound variables in the nearest enclosing scope
# excluding globals.
nonlocal callback_has_been_called
print("Callback called, context:{}, event:{!r}".format(context, event))
# Notify the main program by setting the flag
callback_has_been_called = True
# setup communication variable and callback
callback_has_been_called = False
callback = canlib.dll.KVCALLBACK_T(callback_func)
with canlib.openChannel(0) as ch:
ch.set_callback(callback, context=121, event=canlib.Notify.BUSONOFF)
# trigger the callback
ch.busOn()
# do something else
time.sleep(0.5)
# Verify that the callback was triggered
assert callback_has_been_called
Note:
It is very important to make sure that you keep a reference to the
callback type (``callback`` in the sample above) for as long as any C
library might call it. If it gets deleted by the garbage collector,
calling it from C is likely to either cause a segfault or maybe even
interpret random memory as machine language.
Arguments:
function (`~canlib.canlib.KVCALLBACK_T`): A ctypes wrapped Python function
event (`~canlib.canlib.Notify`): A combination of flags to indicate
what events to trigger on
.. versionadded:: 1.7
"""
dll.kvSetNotifyCallback(self.handle, function, context, event)
[docs] def fileGetCount(self):
"""Get the number of files on the device.
Returns:
`int`: The number of files.
"""
count = ct.c_int()
dll.kvFileGetCount(self.handle, ct.byref(count))
return count.value
[docs] def fileGetName(self, fileNo):
"""Get the name of the file with the supplied number.
Args:
fileNo (`int`): The number of the file.
Returns:
`str`: The name of the file.
"""
fileName = ct.create_string_buffer(50)
dll.kvFileGetName(self.handle, ct.c_int(fileNo), fileName, ct.sizeof(fileName))
return fileName.value.decode('utf-8')
[docs] def fileDelete(self, deviceFileName):
"""Delete file from device."""
dll.kvFileDelete(self.handle, deviceFileName.encode('utf-8'))
[docs] def fileCopyToDevice(self, hostFileName, deviceFileName=None):
"""Copy an arbitrary file from the host to the () isdevice.
The filename must adhere to the FAT '8.3' naming standard, max 8
characters - a dot - max 3 characters.
Args:
hostFileName (`str`): The target host file name.
deviceFileName (`str`, optional): The device file name.
Defaults to the same as *hostFileName*.
"""
if deviceFileName is None:
deviceFileName = hostFileName
dll.kvFileCopyToDevice(
self.handle, hostFileName.encode('utf-8'), deviceFileName.encode('utf-8')
)
[docs] def fileCopyFromDevice(self, deviceFileName, hostFileName=None):
"""Copy an arbitrary file from the device to the host.
Args:
deviceFileName (`str`): The device file name.
hostFileName (`str`, optional): The target host file name.
Defaults to deviceFileName.
"""
if hostFileName is None:
hostFileName = deviceFileName
dll.kvFileCopyFromDevice(
self.handle, deviceFileName.encode('utf-8'), hostFileName.encode('utf-8')
)
[docs] def kvDeviceSetMode(self, mode):
"""Set the current device's mode.
Note:
The mode is device specific, which means that not all modes are
implemented in all products.
Args:
mode (`int`): One of `~canlib.canlib.DeviceMode`, defining which
mode to use.
"""
dll.kvDeviceSetMode(self.handle, ct.c_int(mode))
[docs] def kvDeviceGetMode(self):
"""Read the current device's mode.
Note:
The mode is device specific, which means that not all modes are
implemented in all products.
Returns:
`int`: One of `DeviceMode`, indicating which mode is in use.
"""
mode = ct.c_int()
dll.kvDeviceGetMode(self.handle, ct.byref(mode))
return DeviceMode(mode.value)
[docs] def number_of_io_pins(self):
"""Return the total number of available I/O pins
.. versionadded:: 1.8
"""
pin_count = ct.c_uint()
dll.kvIoGetNumberOfPins(self.handle, ct.byref(pin_count))
return pin_count.value
[docs] def io_pins(self):
"""Generator that returns all I/O pins one by one
Returns object depending on pin type and direction: `iopin.AnalogIn`,
`iopin.AnalogOut`, `iopin.DigitalIn`, `iopin.DigitalOut` or
`iopin.Relay`.
.. versionadded:: 1.8
"""
for index in range(self.number_of_io_pins()):
yield iopin.get_io_pin(channel=self, index=index)
[docs] def io_confirm_config(self):
"""Confirm current I/O configuration
It is required to confirm a configuration by calling this function
before accessing any I/O pins value.
.. versionadded:: 1.8
"""
dll.kvIoConfirmConfig(self.handle)
[docs] def get_io_pin(self, index):
"""Return I/O pin using *index*
Returns:
`iopin.IoPin`: io pin object for *index* (Any of `iopin.AnalogIn`,
`iopin.DigitalOut` etc)
.. versionadded:: 1.8
"""
return iopin.get_io_pin(channel=self, index=index)
[docs] def is_can_fd(self):
"""Return `True` if the channel has been opened with the
`~Open.CAN_FD` or `~Open.CAN_FD_NONISO` flags.
Returns:
`True` if CAN FD, `False` otherwise.
.. versionadded: 1.17
"""
return self.flags & Open.CAN_FD or self.flags & Open.CAN_FD_NONISO