"""Wrapper for Kvaser Database API (kvaDbLib)."""
import ctypes as ct
from ..canlib import MessageFlag as CanMessageFlag
from ..frame import Frame
from . import wrapper
from .attribute import Attribute
from .bound_message import BoundMessage
from .enums import (AttributeOwner, MessageFlag, SignalByteOrder,
SignalMultiplexMode, SignalType)
from .exceptions import (KvdNoAttribute, KvdNoMessage, KvdNoSignal,
KvdWrongOwner)
from .node import Node
from .signal import EnumSignal, Signal
from .wrapper import dll
def _signal_object(db, msg, handle):
"""Instantiate either a `Signal` or an `EnumSignal`
This function checks whether the signal pointed to by ``handle`` has any
enumeration values; if it does, an `EnumSignal` is created and
returned -- otherwise, a `Signal` is created and returned.
"""
try:
dll.kvaDbGetFirstEnumValue(handle, ct.c_void_p())
except KvdNoAttribute:
return Signal(db, msg, handle, mode=None, scaling=None)
else:
return EnumSignal(db, msg, handle, mode=None, scaling=None)
[docs]class Message:
"""Database message, holds signals."""
def __init__(self, db, handle, name=None, id=None, flags=None, dlc=None, comment=None):
"""Create a message and optionally set name, id and/or flags."""
self._handle = handle
self._can_data = ct.create_string_buffer(64) # for signal data
# We need to save a pointer to the database since the API does not have
# a way to get database handle from a message handle
self._db = db
if name is not None:
self.name = name
if id is not None:
self.id = id
if flags is not None:
self.flags = flags
if dlc is not None:
self.dlc = dlc
if comment is not None:
self.comment = comment
def __eq__(self, other):
attrs = 'comment dlc id flags name'.split()
return all(getattr(self, a) == getattr(other, a) for a in attrs)
def __iter__(self):
return self.signals()
def __len__(self):
"""Returns number of signals in message."""
return sum(1 for _ in self)
def __ne__(self, other):
return not self == other
def __str__(self):
# __repr__ should never do any calls to the dll, so this needs to be done in __str__
return "Message(name={!r}, id={!r}, flags={!r}, dlc={!r}, comment={!r})".format(
self.name, self.id, self.flags, self.dlc, self.comment
)
[docs] def asframe(self):
"""Creates a Frame object with empty data matching this message"""
length = wrapper.dlc_to_bytes(
self.dlc,
self._db.protocol.value,
)
# convert flags from kvaDbLib format to CANlib format
can_flags = CanMessageFlag(0)
if self.flags & MessageFlag.EXT:
can_flags |= CanMessageFlag.EXT
return Frame(id_=self.id, data=bytearray(length), flags=can_flags)
[docs] def attributes(self):
"""Return a generator over all message attributes."""
ah = None
nah = ct.c_void_p()
try:
dll.kvaDbGetFirstMsgAttribute(self._handle, ct.byref(nah))
except KvdNoAttribute:
return
while nah.value is not None:
ah, nah = nah, ct.c_void_p()
yield Attribute(self, ah)
try:
dll.kvaDbGetNextAttribute(ah, ct.byref(nah))
except KvdNoAttribute:
return
[docs] def bind(self, frame=None):
"""Bind this message to a frame
Creates a new BoundMessage object representing this message bound to
the given Frame object, or a new Frame object if `frame` is `None`.
"""
return BoundMessage(self, frame or self.asframe())
[docs] def delete_attribute(self, name):
"""Delete attribute from message."""
ah = ct.c_void_p()
dll.kvaDbGetMsgAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah))
dll.kvaDbDeleteMsgAttribute(self._handle, ah)
[docs] def delete_signal(self, signal):
"""Delete signal from message.
Args:
signal (`Signal`): signal to be deleted
"""
dll.kvaDbDeleteSignal(self._handle, signal._handle)
[docs] def get_attribute_value(self, name):
"""Return attribute value
If the attribute is not set on the message, we return the attribute
definition default value.
.. versionchanged:: 1.18
When an EnumAttribute is not set, the default value will now be
returned as `int` (instead of `EnumValue` with empty `name`).
"""
ah = ct.c_void_p()
# Try and find attribute on message
try:
dll.kvaDbGetMsgAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah))
except KvdNoAttribute:
# Lookup the attribute definition
atr_def = self._db.get_attribute_definition_by_name(name)
# only attributes with message as owner are valid, name is also
# unique accross all attributes so it is enough to check this one
# for owner
if atr_def.owner != AttributeOwner.MESSAGE:
raise KvdWrongOwner()
value = atr_def.definition.default
else:
attribute = Attribute(self._db, ah)
value = attribute.value
# if the attribute was an EnumAttribute, find the value
try:
value = value.value
except AttributeError:
pass
return value
[docs] def get_signal(self, name):
"""Find signal in message by name."""
return self.get_signal_by_name(name)
[docs] def get_signal_by_name(self, name):
"""Find signal in message by name."""
sh = ct.c_void_p(None)
dll.kvaDbGetSignalByName(self._handle, name.encode('utf-8'), ct.byref(sh))
signal = _signal_object(self._db, self, sh)
return signal
[docs] def new_signal(
self,
name,
type=SignalType.UNSIGNED,
byte_order=SignalByteOrder.INTEL,
mode=SignalMultiplexMode.SIGNAL,
representation=None,
size=None,
scaling=None,
limits=None,
unit=None,
comment=None,
enums=None,
):
"""Create and add a new signal to the message."""
sh = ct.c_void_p(None)
dll.kvaDbAddSignal(self._handle, ct.byref(sh))
if type > SignalType._ENUM_SEPARATOR:
type -= SignalType._ENUM_SEPARATOR
signal = EnumSignal(
self._db,
self,
sh,
name,
type,
byte_order,
mode,
size,
scaling,
limits,
unit,
comment,
enums,
)
else:
signal = Signal(
self._db,
self,
sh,
name=name,
type=type,
byte_order=byte_order,
mode=mode,
representation=representation,
size=size,
scaling=scaling,
limits=limits,
unit=unit,
comment=comment,
)
return signal
[docs] def set_attribute_value(self, name, value):
"""Set value of attribute 'name' on message.
If no attribute called 'name' is set on message, attach a message
attribute from the database attribute definition first.
"""
ah = ct.c_void_p()
# Try and find attribute on message
try:
dll.kvaDbGetMsgAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah))
except KvdNoAttribute:
# If no attribute was found, lookup the attribute definition and
# add a new attribute to the message
attrib_def = self._db.get_attribute_definition_by_name(name)
dll.kvaDbAddMsgAttribute(self._handle, attrib_def._handle, ct.byref(ah))
# Set the value in the message attribute
attribute = Attribute(self._db, ah)
attribute.value = value
[docs] def signals(self):
"""Return a generator of all signals in message."""
sh = ct.c_void_p(None)
try:
dll.kvaDbGetFirstSignal(self._handle, ct.byref(sh))
except (KvdNoSignal, KvdNoMessage):
# KvdNoMessage is reported with older (5.22?) dlls
return
while sh.value is not None:
yield _signal_object(self._db, self, sh)
sh = ct.c_void_p()
try:
dll.kvaDbGetNextSignal(self._handle, ct.byref(sh))
except (KvdNoSignal, KvdNoMessage):
# KvdNoMessage is reported with older (5.22?) dlls
return
@property
def comment(self):
"""`str`: Comment message"""
buf = ct.create_string_buffer(255)
dll.kvaDbGetMsgComment(self._handle, buf, ct.sizeof(buf))
try:
comment = buf.value.decode('utf-8')
except UnicodeDecodeError:
comment = buf.value.decode('cp1252')
return comment
@comment.setter
def comment(self, value):
dll.kvaDbSetMsgComment(self._handle, value.encode('utf-8'))
@property
def dlc(self):
"""`int`: The message dlc"""
c_dlc = ct.c_int(0)
dll.kvaDbGetMsgDlc(self._handle, ct.byref(c_dlc))
return c_dlc.value
@dlc.setter
def dlc(self, dlc):
dll.kvaDbSetMsgDlc(self._handle, dlc)
@property
def id(self):
"""`int`: The message identifier"""
c_id = ct.c_uint(0)
dll.kvaDbGetMsgIdEx(self._handle, ct.byref(c_id))
return c_id.value
@id.setter
def id(self, value):
dll.kvaDbSetMsgIdEx(self._handle, value)
@property
def flags(self):
"""`MessageFlag`: The message flags"""
c_flags = ct.c_uint(0)
dll.kvaDbGetMsgFlags(self._handle, ct.byref(c_flags))
try:
# There is no guarantee the flags from the dbc file will be valid
# MessageFlags
return MessageFlag(c_flags.value)
except ValueError:
return c_flags.value
@flags.setter
def flags(self, value):
"""Set the message flags."""
dll.kvaDbSetMsgFlags(self._handle, value)
@property
def name(self):
"""`str`: The message name"""
buf = ct.create_string_buffer(255)
dll.kvaDbGetMsgName(self._handle, buf, ct.sizeof(buf))
return buf.value.decode('utf-8')
@name.setter
def name(self, value):
dll.kvaDbSetMsgName(self._handle, value.encode('utf-8'))
@property
def qualified_name(self):
"""`str`: The qualified message name
Returns database and message names separated by a dot.
"""
buf = ct.create_string_buffer(255)
dll.kvaDbGetMsgQualifiedName(self._handle, buf, ct.sizeof(buf))
return buf.value.decode('utf-8')
@property
def send_node(self):
"""`Node`: The send node for this message."""
nh = ct.c_void_p()
dll.kvaDbGetMsgSendNode(self._handle, ct.byref(nh))
node = Node(self._db, nh)
return node
@send_node.setter
def send_node(self, value):
dll.kvaDbSetMsgSendNode(self._handle, value._handle)