Note
This Python sample has a corresponding C++ sample.
Synced Camera Streams Slicer using Python
The Synced Camera Streams Slicer sample demonstrates how to utilize the SyncedCameraStreamsSlicer
class from the Metavision SDK to divide and process event streams generated by a master and slave Camera
instances. This can be done based on either a fixed number of events or a specified time duration.
Expected Output
How to start
To start the sample based on recorded data, provide the full path to synchronized RAW files (here, we use files from our Sample Recordings):
python metavision_synced_camera_streams_slicer.py -i courtyard_walk_stereo.master.hdf5 courtyard_walk_stereo.slave.hdf5
This sample cannot be run with the first available cameras as it needs to know which camera is the master and which are the slaves. To start the sample based on the live stream from your cameras, run:
python metavision_synced_camera_streams_slicer.py -s SN_MASTER SN_SLAVE_1 ... SN_SLAVE_N
To check for additional options:
python metavision_synced_camera_streams_slicer.py -h
Code Overview
First we define a CameraView class that will hold the frame and window for each camera and implement the processing logic for the slices (i.e. generating a frame from the events and displaying it in the window):
class CameraView:
"""
Class to display a camera's events slice in a window
"""
def __init__(self, camera, name):
"""
Constructor
Args:
camera: Camera to display
name: Name of the window
"""
width = camera.width()
height = camera.height()
self.frame = np.zeros((height, width, 3), np.uint8)
self.window = MTWindow(name, width, height, BaseWindow.RenderMode.BGR, True)
def keyboard_cb(key, scancode, action, mods):
"""
Keyboard callback
Args:
key: Key pressed
scancode: Scancode
action: Action (press, release)
mods: Mods (shift, ctrl, alt)
"""
if key == UIKeyEvent.KEY_ESCAPE or key == UIKeyEvent.KEY_Q:
self.window.set_close_flag()
self.window.set_keyboard_callback(keyboard_cb)
def process(self, events):
"""
Generates a frame from the events and displays it in the window
Args:
events: Events to display
"""
BaseFrameGenerationAlgorithm.generate_frame(events, self.frame)
self.window.show_async(self.frame)
Then, we build the camera system meaning the master and slaves cameras. To do so, we rely on the
SyncedCameraSystemBuilder
helper class. We pass all the
provided command line arguments to it and then call the build() method to create the camera system. Internally, the
builder will determine if a live or offline camera system should be created based on the provided arguments and build
and configure all the camera instances accordingly:
builder = SyncedCameraSystemBuilder()
def get_settings_file_path(config_dir, serial_number) -> Optional[Path]:
settings_file_path = Path(config_dir) / f"{serial_number}.json"
if not settings_file_path.exists():
return None
return settings_file_path
for sn in args.camera_serial_numbers:
print(f"Adding camera with serial number {sn}")
settings_file_path = get_settings_file_path(args.config_path, sn)
builder.add_live_camera_parameters(serial_number=sn, settings_file_path=settings_file_path)
builder.set_record(args.record)
builder.set_record_dir(args.record_path)
for record in args.input_event_files:
builder.add_record_path(record)
hints = FileConfigHints()
hints.real_time_playback(args.real_time_playback)
builder.set_file_config_hints(hints)
[master, slaves] = builder.build()
Next, we create a SyncedCameraStreamsSlicer
instance by
transferring the ownership of the camera system to it and setting the desired slicing mode:
slicer = SyncedCameraStreamsSlicer(master.move(), [slave.move() for slave in slaves], args.slice_condition)
Note
The camera instances are transferred to the slicer in a similar way it would be done in C++ by using the move semantic. It means that any variable holding the moved camera instance after this point will be invalid and should not be used anymore (i.e. undefined behavior otherwise). Under the hood, the move() function will return a special object that will hold the ownership of the camera instance. This object can only be used to instantiate a slicer instance. An exception will be thrown if the user tries to use this special object more than once.
Next, we build a CameraView instance for each camera:
views = [CameraView(slicer.master(), "Master")]
for i in range(slicer.slaves_count()):
views.append(CameraView(slicer.slave(i), f"Slave {i}"))
Finally, we start slicing the synchronized event streams by iterating over the slicer. The information about the slices is printed to the console and the slice of each camera is processed and displayed in each view:
for slice in slicer:
EventLoop.poll_and_dispatch()
log_slice_info(slice)
views[0].process(slice.master_events)
for i in range(slicer.slaves_count()):
views[i + 1].process(slice.slave_events[i])
if should_exit(views):
break