Note

This Python sample has a corresponding C++ sample.

Synced Camera Streams Slicer using Python

The Synced Camera Streams Slicer sample demonstrates how to utilize the SyncedCameraStreamsSlicer class from the Metavision SDK to divide and process event streams generated by a master and slave Camera instances. This can be done based on either a fixed number of events or a specified time duration.

Expected Output

Synced Camera Streams Slicer sample output

How to start

To start the sample based on recorded data, provide the full path to synchronized RAW files (here, we use files from our Sample Recordings):

python metavision_synced_camera_streams_slicer.py -i courtyard_walk_stereo.master.hdf5 courtyard_walk_stereo.slave.hdf5

This sample cannot be run with the first available cameras as it needs to know which camera is the master and which are the slaves. To start the sample based on the live stream from your cameras, run:

python metavision_synced_camera_streams_slicer.py -s SN_MASTER SN_SLAVE_1 ... SN_SLAVE_N

To check for additional options:

python metavision_synced_camera_streams_slicer.py -h

Code Overview

First we define a CameraView class that will hold the frame and window for each camera and implement the processing logic for the slices (i.e. generating a frame from the events and displaying it in the window):

class CameraView:
    """
    Class to display a camera's events slice in a window
    """

    def __init__(self, camera, name):
        """
        Constructor

        Args:
            camera: Camera to display
            name: Name of the window
        """
        width = camera.width()
        height = camera.height()
        self.frame = np.zeros((height, width, 3), np.uint8)
        self.window = MTWindow(name, width, height, BaseWindow.RenderMode.BGR, True)

        def keyboard_cb(key, scancode, action, mods):
            """
            Keyboard callback

            Args:
                key: Key pressed
                scancode: Scancode
                action: Action (press, release)
                mods: Mods (shift, ctrl, alt)
            """
            if key == UIKeyEvent.KEY_ESCAPE or key == UIKeyEvent.KEY_Q:
                self.window.set_close_flag()

        self.window.set_keyboard_callback(keyboard_cb)

    def process(self, events):
        """
        Generates a frame from the events and displays it in the window

        Args:
            events: Events to display
        """
        BaseFrameGenerationAlgorithm.generate_frame(events, self.frame)
        self.window.show_async(self.frame)


Then, we build the camera system meaning the master and slaves cameras. To do so, we rely on the SyncedCameraSystemBuilder helper class. We pass all the provided command line arguments to it and then call the build() method to create the camera system. Internally, the builder will determine if a live or offline camera system should be created based on the provided arguments and build and configure all the camera instances accordingly:

builder = SyncedCameraSystemBuilder()

def get_settings_file_path(config_dir, serial_number) -> Optional[Path]:
    settings_file_path = Path(config_dir) / f"{serial_number}.json"
    if not settings_file_path.exists():
        return None
    return settings_file_path

for sn in args.camera_serial_numbers:
    print(f"Adding camera with serial number {sn}")
    settings_file_path = get_settings_file_path(args.config_path, sn)
    builder.add_live_camera_parameters(serial_number=sn, settings_file_path=settings_file_path)

builder.set_record(args.record)
builder.set_record_dir(args.record_path)

for record in args.input_event_files:
    builder.add_record_path(record)

hints = FileConfigHints()
hints.real_time_playback(args.real_time_playback)

builder.set_file_config_hints(hints)

[master, slaves] = builder.build()

Next, we create a SyncedCameraStreamsSlicer instance by transferring the ownership of the camera system to it and setting the desired slicing mode:

slicer = SyncedCameraStreamsSlicer(master.move(), [slave.move() for slave in slaves], args.slice_condition)

Note

The camera instances are transferred to the slicer in a similar way it would be done in C++ by using the move semantic. It means that any variable holding the moved camera instance after this point will be invalid and should not be used anymore (i.e. undefined behavior otherwise). Under the hood, the move() function will return a special object that will hold the ownership of the camera instance. This object can only be used to instantiate a slicer instance. An exception will be thrown if the user tries to use this special object more than once.

Next, we build a CameraView instance for each camera:

views = [CameraView(slicer.master(), "Master")]

for i in range(slicer.slaves_count()):
    views.append(CameraView(slicer.slave(i), f"Slave {i}"))

Finally, we start slicing the synchronized event streams by iterating over the slicer. The information about the slices is printed to the console and the slice of each camera is processed and displayed in each view:

for slice in slicer:
    EventLoop.poll_and_dispatch()

    log_slice_info(slice)

    views[0].process(slice.master_events)

    for i in range(slicer.slaves_count()):
        views[i + 1].process(slice.slave_events[i])

    if should_exit(views):
        break