Send Random Messages on a Channel

"""pinger.py -- Send random CAN messages based on a database

This script uses canlib.canlib and canlib.kvadblib to send random messages from
a database with random data.

It requires a CANlib channel a connected device capable of sending CAN
messages, something that receives those messages, and a database to inspect for
the messages to send.

Messages can be received and printed by e.g. the dbmonitor.py example script.

"""
import argparse
import random
import time

from canlib import canlib, kvadblib

bitrates = {
    '1M': canlib.Bitrate.BITRATE_1M,
    '500K': canlib.Bitrate.BITRATE_500K,
    '250K': canlib.Bitrate.BITRATE_250K,
    '125K': canlib.Bitrate.BITRATE_125K,
    '100K': canlib.Bitrate.BITRATE_100K,
    '62K': canlib.Bitrate.BITRATE_62K,
    '50K': canlib.Bitrate.BITRATE_50K,
    '83K': canlib.Bitrate.BITRATE_83K,
    '10K': canlib.Bitrate.BITRATE_10K,
}


def set_random_framebox_signal(db, framebox, signals):
    sig = random.choice(signals)
    value = get_random_value(db, sig)
    framebox.signal(sig.name).phys = value


def get_random_value(db, sig):
    limits = sig.limits
    value = random.uniform(limits.min, limits.max)

    # round value depending on type...
    if sig.type is kvadblib.SignalType.UNSIGNED or sig.type is kvadblib.SignalType.SIGNED:
        # ...remove decimals if the signal was of type unsigned
        value = int(round(value))
    else:
        # ...otherwise, round to get only one decimal
        value = round(value, 1)

    return value


def ping_loop(channel_number, db_name, num_messages, quantity, interval, bitrate, seed=0):
    db = kvadblib.Dbc(filename=db_name)

    ch = canlib.openChannel(channel_number, bitrate=bitrate)
    ch.setBusOutputControl(canlib.canDRIVER_NORMAL)
    ch.busOn()

    random.seed(seed)

    if num_messages == -1:
        used_messages = list(db)
    else:
        used_messages = random.sample(list(db), num_messages)

    print()
    print("Randomly selecting signals from the following messages:")
    print(used_messages)
    print("Seed used was " + repr(seed))
    print()

    while True:
        # Create an empty framebox each time, ignoring previously set signal
        # values.
        framebox = kvadblib.FrameBox(db)

        # Add all messages to the framebox, as we may use send any signal from
        # any of them.
        for msg in db:
            framebox.add_message(msg.name)

        # Make a list of all signals (which framebox has found in all messages
        # we gave it), so that set_random_framebox_signal() can pick a random
        # one.
        signals = [bsig.signal for bsig in framebox.signals()]

        # Set some random signals to random values
        for i in range(quantity):
            set_random_framebox_signal(db, framebox, signals)

        # Send all messages/frames
        for frame in framebox.frames():
            print('Sending frame', frame)
            ch.writeWait(frame, timeout=5000)

        time.sleep(interval)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="Send random CAN message based on a database.")
    parser.add_argument(
        'channel', type=int, default=0, nargs='?', help=("The channel to send messages on.")
    )
    parser.add_argument(
        '--bitrate', '-b', default='500k', help=("Bitrate, one of " + ', '.join(bitrates.keys()))
    )
    parser.add_argument(
        '--db', default="engine_example.dbc", help=("The database file to base messages on.")
    )
    parser.add_argument(
        '-Q', '--quantity', type=int, default=5, help=("The number of signals to send each tick.")
    )
    parser.add_argument(
        '-I', '--interval', type=float, default=0.2, help=("The time, in seconds, between ticks.")
    )
    parser.add_argument(
        '-n',
        '--num-messages',
        type=int,
        default=-1,
        help=("The number of message from the database to use, or -1 to use all."),
    )
    parser.add_argument(
        '-s',
        '--seed',
        nargs='?',
        default='0',
        help=(
            "The seed used for choosing messages. If possible, will be converted to an int. If no argument is given, a random seed will be used."
        ),
    )
    args = parser.parse_args()

    if args.seed is None:
        seed = None
    else:
        try:
            seed = int(args.seed)
        except ValueError:
            seed = args.seed

    ping_loop(
        channel_number=args.channel,
        db_name=args.db,
        num_messages=args.num_messages,
        quantity=args.quantity,
        interval=args.interval,
        bitrate=bitrates[args.bitrate.upper()],
        seed=args.seed,
    )

Description

Note

There must be some process reading the messages for pinger.py to work (see e.g. Monitor a Channel Using a Database).

ping_loop will first extract a random list of messages (see Randomness), and then enter a loop that creates a new canlib.kvadblib.FrameBox before adding some random signals with random values (the quantity specified by the quantity/--quantity argument).

Adding random signals is done with set_random_framebox_signal, which picks a random signal from the framebox, and get_random_value which inspects the given signal and provides a random value based on the signal’s definition.

Finally, the loop pauses for interval/--interval seconds between sending messages.

Randomness

The random selection of messages is done with the seed/--seed and num_messages/num-messages arguments. If num_messages is -1, all messages from the database will be used. Otherwise, num_message specifies the number of messages to be randomly picked from the database.

The seed argument will be sent to random.seed before the messages are selected (which is done with random.sample), which means as long as the seed remains the same, the same messages are selected. The seed can also be set to None for a pseudo-random seed.

Sample Output

Randomly selecting signals from the following messages:
[Message(name='EngineData', id=100, flags=<MessageFlag.0: 0>, dlc=8, comment=''), Message(name='GearBoxInfo', id=1020, flags=<MessageFlag.0: 0>, dlc=1, comment='')]
Seed used was '0'

Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x16]\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xdd\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x00\x00\x00\xe0\x00\x00`\t'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x04'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'f\x07\n\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=1020, data=bytearray(b'\x00'), dlc=1, flags=<MessageFlag.0: 0>, timestamp=None)
Sending frame Frame(id=100, data=bytearray(b'\x0c\x15-\x00\x00\x00\x00\x00'), dlc=8, flags=<MessageFlag.0: 0>, timestamp=None)