Send Random Messages on CAN Channel¶
"""pinger.py -- Send random CAN messages based on a database
This script uses canlib.canlib and canlib.kvadblib to send random messages from
a database with random data.
It requires a CANlib channel a connected device capable of sending CAN
messages, something that receives those messages, and a database to inspect for
the messages to send.
Messages can be received and printed by e.g. the dbmonitor.py example script.
"""
import argparse
import random
import time
from canlib import canlib, kvadblib
# Create a dictionary of predefined CAN bitrates, using the name after
# "BITRATE_" as key. E.g. "500K".
bitrates = {x.name.replace("BITRATE_", ""): x for x in canlib.Bitrate}
def set_random_framebox_signal(db, framebox, signals):
sig = random.choice(signals)
value = get_random_value(db, sig)
framebox.signal(sig.name).phys = value
def get_random_value(db, sig):
limits = sig.limits
value = random.uniform(limits.min, limits.max)
# round value depending on type...
if sig.type is kvadblib.SignalType.UNSIGNED or sig.type is kvadblib.SignalType.SIGNED:
# ...remove decimals if the signal was of type unsigned
value = int(round(value))
else:
# ...otherwise, round to get only one decimal
value = round(value, 1)
return value
def ping_loop(ch, db, num_messages, quantity, interval, seed=None):
if seed is None:
seed = 0
else:
try:
seed = int(seed)
except ValueError:
seed = seed
random.seed(seed)
if num_messages == -1:
used_messages = list(db)
else:
used_messages = random.sample(list(db), num_messages)
print()
print("Randomly selecting signals from the following messages:")
print(used_messages)
print("Seed used was " + repr(seed))
print()
while True:
# Create an empty framebox each time, ignoring previously set signal
# values.
framebox = kvadblib.FrameBox(db)
# Add all messages to the framebox, as we may use send any signal from
# any of them.
for msg in db:
# If the channel is opened as CAN FD, we ignore messages in dbc
# that is not marked as CAN FD. Similarily, if the channel is
# opened as CAN, we inore messages in dbc that is marked as CAN FD.
if ch.is_can_fd() != (canlib.MessageFlag.FDF in msg.canflags):
continue
framebox.add_message(msg.name)
# Make a list of all signals (which framebox has found in all messages
# we gave it), so that set_random_framebox_signal() can pick a random
# one.
signals = [bsig.signal for bsig in framebox.signals()]
# Set some random signals to random values
for i in range(quantity):
set_random_framebox_signal(db, framebox, signals)
# Send all messages/frames
for frame in framebox.frames():
print('Sending frame', frame)
ch.writeWait(frame, timeout=5000)
time.sleep(interval)
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Send random CAN message based on a database.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
'channel', type=int, default=0, nargs='?', help=("The channel to send messages on.")
)
parser.add_argument(
'--bitrate', '-b', default='500k', help=("Bitrate, one of " + ', '.join(bitrates.keys()))
)
parser.add_argument(
'--db', default="engine_example.dbc", help=("The database file to base messages on.")
)
parser.add_argument(
'-Q', '--quantity', type=int, default=5, help=("The number of signals to send each tick.")
)
parser.add_argument(
'-I', '--interval', type=float, default=0.2, help=("The time, in seconds, between ticks.")
)
parser.add_argument(
'-n',
'--num-messages',
type=int,
default=-1,
help=("The number of message from the database to use, or -1 to use all."),
)
parser.add_argument(
'-s',
'--seed',
nargs='?',
default='0',
help=(
"The seed used for choosing messages. If possible, will be converted to an int."
" If no argument is given, a random seed will be used."
),
)
args = parser.parse_args()
return args
def main(argv=None):
args = parse_args(argv)
db = kvadblib.Dbc(filename=args.db)
ch = canlib.openChannel(args.channel, bitrate=bitrates[args.bitrate.upper()])
ch.setBusOutputControl(canlib.canDRIVER_NORMAL)
ch.busOn()
ping_loop(
ch=ch,
db=db,
num_messages=args.num_messages,
quantity=args.quantity,
interval=args.interval,
seed=args.seed,
)
if __name__ == '__main__':
raise SystemExit(main())
Description¶
Note
There must be some process reading the messages for pinger.py
to work
(see e.g. Monitor Channel Using CAN Database).
ping_loop
will first extract a random list of messages (see Randomness), and then enter a loop that creates a new FrameBox
before adding some random signals with random values (the quantity specified by the quantity
/--quantity
argument).
Adding random signals is done with set_random_framebox_signal
, which picks a random signal from the framebox, and get_random_value
which inspects the given signal and provides a random value based on the signal’s definition.
Finally, the loop pauses for interval
/--interval
seconds between sending messages.
Randomness¶
The random selection of messages is done with the seed
/--seed
and num_messages
/num-messages
arguments. If num_messages
is -1
, all messages from the database will be used. Otherwise, num_message
specifies the number of messages to be randomly picked from the database.
The seed
argument will be sent to random.seed
before the messages are
selected (which is done with random.sample
), which means as long as the seed
remains the same, the same messages are selected. The seed
can also be set to
None
for a pseudo-random seed.
Sample Output¶
Randomly selecting signals from the following messages:
[Message(name='EngineData', id=100, flags=<MessageFlag.0: 0>, dlc=8, comment=''), Message(name='GearBoxInfo', id=1020, flags=<MessageFlag.0: 0>, dlc=1, comment='')]
Seed used was '0'
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x16]\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xdd\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xe0\x00\x00`\t'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x04'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'f\x07\n\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x0c\x15-\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
CAN FD version¶
This example is basically the same as pinger.py
above, except we are now using CAN FD.
Note that you also need the pinger.py
file, next to pingerfd.py
below, since we are reusing the ping_loop
function.
"""pingerfd.py -- Send random CAN FD messages based on a database
This script uses canlib.canlib and canlib.kvadblib to send random messages from
a database with random data.
The script also reuses the ping_loop function defined in pinger.py
It requires a CANlib channel a connected device capable of sending CAN
messages, something that receives those messages, and a database to inspect for
the messages to send.
Messages can be received and printed by e.g. the dbmonitorfd.py example script.
"""
import argparse
import random
import time
from canlib import canlib, kvadblib
import pinger
# Create a dictionary of predefined CAN FD bitrates, using the name after
# "BITRATE_" as key. E.g. "500K_80P".
fd_bitrates = {x.name.replace("BITRATE_", ""): x for x in canlib.BitrateFD}
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Send random CAN FD message based on a database.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
'channel', type=int, default=0, nargs='?', help=("The channel to send messages on.")
)
parser.add_argument(
'--fdbitrate', '-f', default=['500k_80p', '1M_80p'], nargs=2,
help=("CAN FD arbitration and data bitrate pair (e.g. -f 500k_80p 1M_80p), two of " + ', '.join(fd_bitrates.keys()))
)
parser.add_argument(
'--db', default="engine_example.dbc", help=("The database file to base messages on.")
)
parser.add_argument(
'-Q', '--quantity', type=int, default=5, help=("The number of signals to send each tick.")
)
parser.add_argument(
'-I', '--interval', type=float, default=0.2, help=("The time, in seconds, between ticks.")
)
parser.add_argument(
'-n',
'--num-messages',
type=int,
default=-1,
help=("The number of message from the database to use, or -1 to use all."),
)
parser.add_argument(
'-s',
'--seed',
nargs='?',
default='0',
help=(
"The seed used for choosing messages. If possible, will be converted to an int. If no argument is given, a random seed will be used."
),
)
args = parser.parse_args()
if args.seed is not None:
try:
args.seed = int(args.seed)
except ValueError:
args.seed = args.seed
return args
def main(argv=None):
args = parse_args(argv)
db = kvadblib.Dbc(filename=args.db)
print(f"{args.channel=}, {fd_bitrates[args.fdbitrate[0].upper()]=}, {fd_bitrates[args.fdbitrate[1].upper()]=}")
ch = canlib.openChannel(
args.channel,
flags=canlib.Open.CAN_FD,
bitrate=fd_bitrates[args.fdbitrate[0].upper()],
data_bitrate=fd_bitrates[args.fdbitrate[1].upper()],
)
ch.setBusOutputControl(canlib.canDRIVER_NORMAL)
ch.busOn()
pinger.ping_loop(
ch=ch,
db=db,
num_messages=args.num_messages,
quantity=args.quantity,
interval=args.interval,
seed=args.seed,
)
if __name__ == '__main__':
raise SystemExit(main())