Send Random Messages on CAN Channel

"""pinger.py -- Send random CAN messages based on a database

This script uses canlib.canlib and canlib.kvadblib to send random messages from
a database with random data.

It requires a CANlib channel a connected device capable of sending CAN
messages, something that receives those messages, and a database to inspect for
the messages to send.

Messages can be received and printed by e.g. the dbmonitor.py example script.

"""
import argparse
import random
import time

from canlib import canlib, kvadblib

# Create a dictionary of predefined CAN bitrates, using the name after
# "BITRATE_" as key. E.g. "500K".
bitrates = {x.name.replace("BITRATE_", ""): x for x in canlib.Bitrate}


def set_random_framebox_signal(db, framebox, signals):
    sig = random.choice(signals)
    value = get_random_value(db, sig)
    framebox.signal(sig.name).phys = value


def get_random_value(db, sig):
    limits = sig.limits
    value = random.uniform(limits.min, limits.max)

    # round value depending on type...
    if sig.type is kvadblib.SignalType.UNSIGNED or sig.type is kvadblib.SignalType.SIGNED:
        # ...remove decimals if the signal was of type unsigned
        value = int(round(value))
    else:
        # ...otherwise, round to get only one decimal
        value = round(value, 1)

    return value


def ping_loop(ch, db, num_messages, quantity, interval, seed=None):

    if seed is None:
        seed = 0
    else:
        try:
            seed = int(seed)
        except ValueError:
            seed = seed
    random.seed(seed)

    if num_messages == -1:
        used_messages = list(db)
    else:
        used_messages = random.sample(list(db), num_messages)

    print()
    print("Randomly selecting signals from the following messages:")
    print(used_messages)
    print("Seed used was " + repr(seed))
    print()

    while True:
        # Create an empty framebox each time, ignoring previously set signal
        # values.
        framebox = kvadblib.FrameBox(db)

        # Add all messages to the framebox, as we may use send any signal from
        # any of them.
        for msg in db:
            # If the channel is opened as CAN FD, we ignore messages in dbc
            # that is not marked as CAN FD. Similarily, if the channel is
            # opened as CAN, we inore messages in dbc that is marked as CAN FD.
            if ch.is_can_fd() != (canlib.MessageFlag.FDF in msg.canflags):
                continue
            framebox.add_message(msg.name)

        # Make a list of all signals (which framebox has found in all messages
        # we gave it), so that set_random_framebox_signal() can pick a random
        # one.
        signals = [bsig.signal for bsig in framebox.signals()]

        # Set some random signals to random values
        for i in range(quantity):
            set_random_framebox_signal(db, framebox, signals)

        # Send all messages/frames
        for frame in framebox.frames():
            print('Sending frame', frame)
            ch.writeWait(frame, timeout=5000)

        time.sleep(interval)


def parse_args(argv):
    parser = argparse.ArgumentParser(
        description="Send random CAN message based on a database.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument(
        'channel', type=int, default=0, nargs='?', help=("The channel to send messages on.")
    )
    parser.add_argument(
        '--bitrate', '-b', default='500k', help=("Bitrate, one of " + ', '.join(bitrates.keys()))
    )
    parser.add_argument(
        '--db', default="engine_example.dbc", help=("The database file to base messages on.")
    )
    parser.add_argument(
        '-Q', '--quantity', type=int, default=5, help=("The number of signals to send each tick.")
    )
    parser.add_argument(
        '-I', '--interval', type=float, default=0.2, help=("The time, in seconds, between ticks.")
    )
    parser.add_argument(
        '-n',
        '--num-messages',
        type=int,
        default=-1,
        help=("The number of message from the database to use, or -1 to use all."),
    )
    parser.add_argument(
        '-s',
        '--seed',
        nargs='?',
        default='0',
        help=(
            "The seed used for choosing messages. If possible, will be converted to an int."
            " If no argument is given, a random seed will be used."
        ),
    )
    args = parser.parse_args()
    return args


def main(argv=None):
    args = parse_args(argv)
    db = kvadblib.Dbc(filename=args.db)

    ch = canlib.openChannel(args.channel, bitrate=bitrates[args.bitrate.upper()])
    ch.setBusOutputControl(canlib.canDRIVER_NORMAL)
    ch.busOn()

    ping_loop(
        ch=ch,
        db=db,
        num_messages=args.num_messages,
        quantity=args.quantity,
        interval=args.interval,
        seed=args.seed,
    )


if __name__ == '__main__':
    raise SystemExit(main())

Description

Note

There must be some process reading the messages for pinger.py to work (see e.g. Monitor Channel Using CAN Database).

ping_loop will first extract a random list of messages (see Randomness), and then enter a loop that creates a new FrameBox before adding some random signals with random values (the quantity specified by the quantity/--quantity argument).

Adding random signals is done with set_random_framebox_signal, which picks a random signal from the framebox, and get_random_value which inspects the given signal and provides a random value based on the signal’s definition.

Finally, the loop pauses for interval/--interval seconds between sending messages.

Randomness

The random selection of messages is done with the seed/--seed and num_messages/num-messages arguments. If num_messages is -1, all messages from the database will be used. Otherwise, num_message specifies the number of messages to be randomly picked from the database.

The seed argument will be sent to random.seed before the messages are selected (which is done with random.sample), which means as long as the seed remains the same, the same messages are selected. The seed can also be set to None for a pseudo-random seed.

Sample Output

Randomly selecting signals from the following messages:
[Message(name='EngineData', id=100, flags=<MessageFlag.0: 0>, dlc=8, comment=''), Message(name='GearBoxInfo', id=1020, flags=<MessageFlag.0: 0>, dlc=1, comment='')]
Seed used was '0'

Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x16]\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xdd\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xe0\x00\x00`\t'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x04'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'f\x07\n\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x0c\x15-\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)

CAN FD version

This example is basically the same as pinger.py above, except we are now using CAN FD.

Note that you also need the pinger.py file, next to pingerfd.py below, since we are reusing the ping_loop function.

"""pingerfd.py -- Send random CAN FD messages based on a database

This script uses canlib.canlib and canlib.kvadblib to send random messages from
a database with random data.

The script also reuses the ping_loop function defined in pinger.py

It requires a CANlib channel a connected device capable of sending CAN
messages, something that receives those messages, and a database to inspect for
the messages to send.

Messages can be received and printed by e.g. the dbmonitorfd.py example script.

"""
import argparse
import random
import time

from canlib import canlib, kvadblib

import pinger

# Create a dictionary of predefined CAN FD bitrates, using the name after
# "BITRATE_" as key. E.g. "500K_80P".
fd_bitrates = {x.name.replace("BITRATE_", ""): x for x in canlib.BitrateFD}


def parse_args(argv):
    parser = argparse.ArgumentParser(
        description="Send random CAN FD message based on a database.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument(
        'channel', type=int, default=0, nargs='?', help=("The channel to send messages on.")
    )
    parser.add_argument(
        '--fdbitrate', '-f', default=['500k_80p', '1M_80p'], nargs=2,
        help=("CAN FD arbitration and data bitrate pair (e.g. -f 500k_80p 1M_80p), two of " + ', '.join(fd_bitrates.keys()))
    )
    parser.add_argument(
        '--db', default="engine_example.dbc", help=("The database file to base messages on.")
    )
    parser.add_argument(
        '-Q', '--quantity', type=int, default=5, help=("The number of signals to send each tick.")
    )
    parser.add_argument(
        '-I', '--interval', type=float, default=0.2, help=("The time, in seconds, between ticks.")
    )
    parser.add_argument(
        '-n',
        '--num-messages',
        type=int,
        default=-1,
        help=("The number of message from the database to use, or -1 to use all."),
    )
    parser.add_argument(
        '-s',
        '--seed',
        nargs='?',
        default='0',
        help=(
            "The seed used for choosing messages. If possible, will be converted to an int. If no argument is given, a random seed will be used."
        ),
    )
    args = parser.parse_args()

    if args.seed is not None:
        try:
            args.seed = int(args.seed)
        except ValueError:
            args.seed = args.seed
    return args


def main(argv=None):
    args = parse_args(argv)
    db = kvadblib.Dbc(filename=args.db)
    print(f"{args.channel=}, {fd_bitrates[args.fdbitrate[0].upper()]=}, {fd_bitrates[args.fdbitrate[1].upper()]=}")
    ch = canlib.openChannel(
        args.channel,
        flags=canlib.Open.CAN_FD,
        bitrate=fd_bitrates[args.fdbitrate[0].upper()],
        data_bitrate=fd_bitrates[args.fdbitrate[1].upper()],
    )
    ch.setBusOutputControl(canlib.canDRIVER_NORMAL)
    ch.busOn()

    pinger.ping_loop(
        ch=ch,
        db=db,
        num_messages=args.num_messages,
        quantity=args.quantity,
        interval=args.interval,
        seed=args.seed,
    )


if __name__ == '__main__':
    raise SystemExit(main())