Source code for canlib.kvadblib.dbc

"""Wrapper for Kvaser Database API (kvaDbLib).  For more info, see the kvaDbLib
   help files which are availible in the CANlib SDK.
   https://www.kvaser.com/developer/canlib-sdk/

"""

import ctypes as ct

from canlib.canlib.enums import MessageFlag as canlib_MessageFlag

from .attribute import Attribute
from .attributedef import AttributeDefinition
from .enums import AttributeOwner, MessageFlag, ProtocolType
from .exceptions import (KvdDbFileParse, KvdNoAttribute, KvdNoMessage,
                         KvdNoNode, KvdOnlyOneAllowed, KvdWrongOwner)
from .message import Message
from .node import Node
from .wrapper import dll, get_last_parse_error

DATABASE_FLAG_J1939 = 0x0001  #: Flag indicating that the database uses the J1939 protocol.


[docs]class Dbc: """Holds a dbc database. There are three ways to create a database: 1. To load data from an existing database file, only set filename to the database filename. 2. To add an empty database, set only name. 3. To load data from an existing database file and give it a new name, set name to the new name and filename to the existing database filename. Either a name or a filename must be given. Args: filename (str, optional): The existing database file is read. name (str, optional): The database name will be set. Raises: KvdDbFileParse: If the database file can't be parsed. When parsing a database file fails, `get_last_parse_error` can be called:: try: kvadblib.Dbc(filename="database.dbc") except kvadblib.KvdDbFileParse: print(kvadblib.get_last_parse_error()) .. versionchanged:: 1.10 `Dbc` now raises specific errors depending on the situation. """ def __init__(self, filename=None, name=None, protocol=None): if name is None and filename is None: raise TypeError('Either a name or filename must to be given.') self._handle = None handle = ct.c_void_p(None) dll.kvaDbOpen(ct.byref(handle)) self._handle = handle if name: name = name.encode('utf-8') if filename: # use str() here to allow for Path objects as filename filename = str(filename).encode('utf-8') try: dll.kvaDbCreate(self._handle, name, filename) except KvdDbFileParse: print(get_last_parse_error()) raise # set default protocol to CAN if protocol is not None: self.protocol = protocol if protocol is None and self.protocol == ProtocolType.UNKNOWN: self.protocol = ProtocolType.CAN def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __iter__(self): """Return a generator of all normal database messages. Equivalent to Dbc.messages(show_all=False). """ return self.messages(show_all=False) def __len__(self): """Returns number of messages in database.""" return sum(1 for _ in self) def __str__(self): return "Dbc {}: flags:{}, protocol:{}, messages:{}".format( self.name, self.flags, self.protocol.name, len(self) )
[docs] def attribute_definitions(self): """Return a generator over all database attribute definitions.""" adh = None nadh = ct.c_void_p() try: dll.kvaDbGetFirstAttributeDefinition(self._handle, ct.byref(nadh)) except KvdNoAttribute: return while nadh.value is not None: adh, nadh = nadh, ct.c_void_p() yield AttributeDefinition(self, adh) try: dll.kvaDbGetNextAttributeDefinition(adh, ct.byref(nadh)) except KvdNoAttribute: return return
[docs] def attributes(self): """Return a generator over all database attributes. .. versionadded:: 1.6 """ ah = None nah = ct.c_void_p() try: dll.kvaDbGetFirstAttribute(self._handle, ct.byref(nah)) except KvdNoAttribute: return while nah.value is not None: ah, nah = nah, ct.c_void_p() yield Attribute(self, ah) try: dll.kvaDbGetNextAttribute(ah, ct.byref(nah)) except KvdNoAttribute: return
[docs] def close(self): """Close an open database handle.""" if self._handle: dll.kvaDbClose(self._handle)
[docs] def delete_attribute(self, name): """Delete attribute from database. .. versionadded:: 1.6 """ ah = ct.c_void_p() dll.kvaDbGetAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah)) dll.kvaDbDeleteAttribute(self._handle, ah)
[docs] def delete_attribute_definition(self, name): """Delete attribute definition from database. .. versionadded:: 1.7 """ adh = ct.c_void_p(None) dll.kvaDbGetAttributeDefinitionByName(self._handle, name.encode('utf-8'), ct.byref(adh)) dll.kvaDbDeleteAttributeDefinition(self._handle, adh)
[docs] def delete_message(self, message): """Delete message from database. Args: message (`Message`): message to be deleted """ dll.kvaDbDeleteMsg(self._handle, message._handle)
[docs] def delete_node(self, node): """Delete node from database. Args: node (`Node`): node to be deleted """ dll.kvaDbDeleteNode(self._handle, node._handle)
[docs] def get_attribute_definition_by_name(self, name): """Find attribute definition using name. Args: name (str): name of attribute definition Returns an attribute definition object depending on type, e.g. if the type is AttributeType.INTEGER, an `IntegerAttributeDefinition` is returned. """ adh = ct.c_void_p(None) dll.kvaDbGetAttributeDefinitionByName(self._handle, name.encode('utf-8'), ct.byref(adh)) return AttributeDefinition(self, adh)
[docs] def get_attribute_value(self, name): """Return attribute value If the attribute is not set on the database, we return the attribute definition default value. .. versionadded:: 1.6 """ ah = ct.c_void_p() # Try and find attribute on message try: dll.kvaDbGetAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah)) except KvdNoAttribute: # Lookup the attribute definition atr_def = self.get_attribute_definition_by_name(name) # only attributes with database as owner are valid, name is also # unique accross all attributes so it is enough to check this one # for owner if atr_def.owner != AttributeOwner.DB: raise KvdWrongOwner() value = atr_def.definition.default else: attribute = Attribute(self, ah) value = attribute.value return value
[docs] def get_message(self, id=None, name=None): """Find message by id or name If both id and name is given, both most match. Note that bit 31 of the id indicates if the message has an extended id or not. Args: id (int): message id to look for name (str): message name to look for Returns: `Message` Raises: KvdNoMessage: If no match was found, or if none of `id` and `name` were given. """ message = None if (id is not None) and (name is not None): # Both arguments were given message = self.get_message_by_id(id, id & MessageFlag.EXT) if message.name == name: return message else: raise KvdNoMessage() else: if id is not None: return self.get_message_by_id(id, id & MessageFlag.EXT) else: return self.get_message_by_name(name)
[docs] def get_message_by_id(self, id, flags): """Find message by id. Args: id (int): message id to look for flags (int): message flags, e.g. `kvadblib.MessageFlag.EXT` (Note that `kvadblib.MessageFlag.EXT` != `canlib.MessageFlag.EXT`) Returns: `Message` Raises: KvdNoMessage: If no match was found. """ # Still using legacy function kvaDbGetMsgById() in order to be # backwards compliant. flags &= MessageFlag.EXT id |= flags mh = ct.c_void_p(None) dll.kvaDbGetMsgById(self._handle, id, ct.byref(mh)) message = Message(db=self, handle=mh) return message
[docs] def get_message_by_name(self, name): """Find message by name Args: name (str): message name to look for Returns: `Message` Raises: KvdNoMessage: If no match was found. """ mh = ct.c_void_p(None) dll.kvaDbGetMsgByName(self._handle, name.encode('utf-8'), ct.byref(mh)) message = Message(self, mh) return message
[docs] def get_message_by_pgn(self, can_id): """Find message using the PGN part of the given CAN id. Note: The `can_id` must have the `MessageFlag.EXT` flag bit set (bit 31, 0x80000000) :: dbc = kvadblib.Dbc("My_j1939_database.dbc") can_id = 0xff4200 can_id |= kvadblib.MessageFlag.EXT dbc.get_message_by_pgn(can_id) Args: can_id (int): CAN id containing PGN to look for Returns: `Message` Raises: KvdNoMessage: If no match was found. .. versionadded:: 1.18 """ mh = ct.c_void_p(None) dll.kvaDbGetMsgByPGN(self._handle, can_id, ct.byref(mh)) message = Message(db=self, handle=mh) return message
[docs] def get_node_by_name(self, name): """Find node by name Args: name (str): node name to look for Returns: `Node` Raises: KvdNoNode: If no match was found. """ nh = ct.c_void_p(None) dll.kvaDbGetNodeByName(self._handle, name.encode('utf-8'), ct.byref(nh)) return Node(self, nh)
[docs] def messages(self, show_all=True): """Return a generator of all database messages. If you would like to omit the special message 'VECTOR__INDEPENDENT_SIG_MSG', which is used to collect removed signals under, set show_all to False or use the Dbc iterator directly:: db = kvadblib.Dbc(filename='mydb.dbc') for message in db: print(msg) Args: show_all (`bool`): If true, all messages, including any special message such as 'VECTOR__INDEPENDENT_SIG_MSG' will be returned .. versionchanged:: 1.8 Added argument show_all. """ mh = ct.c_void_p(None) try: dll.kvaDbGetFirstMsg(self._handle, ct.byref(mh)) except KvdNoMessage: return while mh.value is not None: msg = Message(self, mh) # The special message name VECTOR__INDEPENDENT_SIG_MSG is used in # .dbc files to collect unused signals into # Only return special message if show_all is True if show_all or msg.name != 'VECTOR__INDEPENDENT_SIG_MSG': yield msg mh = ct.c_void_p() try: dll.kvaDbGetNextMsg(self._handle, ct.byref(mh)) except KvdNoMessage: return
[docs] def interpret(self, frame, j1939=False): """Interprets a given `canlib.Frame` object, returning a `BoundMessage`.""" can_id = frame.id flags = 0 if j1939: # When calling get_message_by_pgn, the EXT flag must be attached to the CAN ID # Check frame.flags first, since it may be '0' # Note that MessageFlag.EXT has different values in canlib vs kvadblib if frame.flags and canlib_MessageFlag.EXT in frame.flags: can_id |= MessageFlag.EXT message = self.get_message_by_pgn(can_id) return message.bind(frame) # When calling get_message_by_id, the EXT flag must be provided in flags argument # Note that MessageFlag.EXT has different values in canlib vs kvadblib # Check frame.flags first, since it may be '0' if frame.flags and canlib_MessageFlag.EXT in frame.flags: flags = flags | MessageFlag.EXT # Still using legacy version of get_message_by_id in order to be # backwards compliant. message = self.get_message_by_id(can_id, flags) return message.bind(frame)
[docs] def new_attribute_definition(self, name, owner, type, definition): """Create a new attribute definition in the database. The owner specify where the attribute is applicable, e.g. `AttributeOwner.MESSAGE` specifies that this attribute is only applicable on messages (`Message`). Args: name (str): a unique name. owner (`AttributeOwner`): the owner type Returns: `AttributeDefinition` """ adh = ct.c_void_p(None) dll.kvaDbAddAttributeDefinition(self._handle, ct.byref(adh)) dll.kvaDbSetAttributeDefinitionType(adh, type.value) dll.kvaDbSetAttributeDefinitionName(adh, name.encode('utf-8')) dll.kvaDbSetAttributeDefinitionOwner(adh, owner) atr_def = AttributeDefinition(self, adh, definition) return atr_def
[docs] def new_message(self, name, id, flags=0, dlc=None, comment=None): """Create a new message in the database. Args: name (str): name of message id (int): message id flags (int, optional): message flags, e.g. `kvadblib.MessageFlag.EXT` Returns: `~canlib.kvadblib.message.Message` """ mh = ct.c_void_p(None) dll.kvaDbAddMsg(self._handle, ct.byref(mh)) try: message = Message(self, mh, name, id, flags, dlc, comment) except KvdOnlyOneAllowed: # If KvdOnlyOneAllowed is thrown, the message was created but not # named properly, in which case we need to delete the message # before calling the users attention dll.kvaDbDeleteMsg(self._handle, mh) raise return message
[docs] def new_node(self, name, comment=None): """Create a new node in the database. Args: name (str): name of message comment (str, optional): message comment Returns: `Node` """ nh = ct.c_void_p(None) dll.kvaDbAddNode(self._handle, ct.byref(nh)) try: node = Node(self, nh, name, comment) except KvdOnlyOneAllowed: # If KvdOnlyOneAllowed is thrown, the node was created but not # named properly, in which case we need to delete the node # before calling the users attention dll.kvaDbDeleteNode(self._handle, nh) raise return node
[docs] def node_in_signal(self, node, signal): """Check if signal has been added to node. Returns: True: signals contains node False: otherwise """ try: dll.kvaDbSignalContainsReceiveNode(signal._handle, node._handle) except KvdNoNode: return False return True
[docs] def nodes(self): """Return a generator containing all database nodes.""" nh = ct.c_void_p(None) try: dll.kvaDbGetFirstNode(self._handle, ct.byref(nh)) except KvdNoNode: return while nh.value is not None: yield Node(self, nh) nh = ct.c_void_p() try: dll.kvaDbGetNextNode(self._handle, ct.byref(nh)) except KvdNoNode: return
[docs] def set_attribute_value(self, name, value): """Set value of attribute *name* on database. If no attribute called *name* is set on database, attach a database attribute from the database attribute definition first. .. versionadded:: 1.6 """ ah = ct.c_void_p() # Try and find attribute on database try: dll.kvaDbGetAttributeByName(self._handle, name.encode('utf-8'), ct.byref(ah)) except KvdNoAttribute: # If no attribute was found, lookup the attribute definition and # add a new attribute to the message attrib_def = self.get_attribute_definition_by_name(name) dll.kvaDbAddAttribute(self._handle, attrib_def._handle, ct.byref(ah)) # Set the value in the message attribute attribute = Attribute(self, ah) attribute.value = value
[docs] def write_file(self, filename): """Write a database to file. Args: filename (str): file to write database to """ dll.kvaDbWriteFile(self._handle, filename.encode('utf-8'))
@property def flags(self): """Get the database flags. E.g. DATABASE_FLAG_J1939 """ flags = ct.c_uint() dll.kvaDbGetFlags(self._handle, ct.byref(flags)) return flags.value @flags.setter def flags(self, value): """Set the database flags. E.g. DATABASE_FLAG_J1939 """ dll.kvaDbSetFlags(self._handle, value) @property def name(self): """`str`: The current database name (read-only)""" buf = ct.create_string_buffer(255) dll.kvaDbGetDatabaseName(self._handle, buf, ct.sizeof(buf)) return buf.value.decode('utf-8') @property def protocol(self): """`ProtocolType`: The database protocol""" p = ct.c_int() dll.kvaDbGetProtocol(self._handle, ct.byref(p)) return ProtocolType(p.value) @protocol.setter def protocol(self, value): dll.kvaDbSetProtocol(self._handle, value)