SDK Core Python bindings

In this tutorial, we describe some of the processes available in Python in the module metavision_sdk_core and present the results visually

import metavision_sdk_core
from metavision_sdk_base import EventCD
from metavision_core.event_io import RawReader
from metavision_sdk_core import BaseFrameGenerationAlgorithm

%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
plt.rcParams['figure.figsize'] = [11, 7]
import os

Load automotive sequence

Here is the link to download the RAW file used in this sample: driving_sample.raw

sequence_filename_raw = "driving_sample.raw"
assert os.path.isfile(sequence_filename_raw)

mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (1280, 720)

ev = mv_raw.load_delta_t(10000)

image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_4_1.png

ROIFilterAlgorithm

Output absolute coordinates (default)

Coordinates of output events are unchanged (expressed in the frame’s coordinate system)

roi_filter = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 1000, 450)
filtered_events_buffer = roi_filter.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD

roi_filter.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()


assert filtered_events_np["x"].min() >= 750
assert filtered_events_np["x"].max() <= 1000
assert filtered_events_np["y"].min() >= 250
assert filtered_events_np["y"].max() <= 450

BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image)
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_6_1.png

Output relative coordinates

Coordinates are expressed in the coordinate system of the ROI (top left of the ROI region is (0,0))

roi_filter_relative = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 999, 449, output_relative_coordinates=True)
filtered_events_buffer = roi_filter_relative.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD

roi_filter_relative.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()


assert filtered_events_np["x"].min() >= 0
assert filtered_events_np["x"].max() < 250
assert filtered_events_np["y"].min() >= 0
assert filtered_events_np["y"].max() < 200

image_small = np.zeros((201, 251, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image_small)
plt.imshow(image_small[:,:,::-1])
../../_images/metavision_sdk_core_bindings_8_1.png

FlipXAlgorithm / FlipYAlgorithm

flip_x = metavision_sdk_core.FlipXAlgorithm(width - 1)
flipped_events_buffer = flip_x.get_empty_output_buffer()

flip_x.process_events(ev, flipped_events_buffer)
plt.title("Events flipped horizontally")
BaseFrameGenerationAlgorithm.generate_frame(flipped_events_buffer.numpy(), image)
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_10_1.png

Processing in-place

Calling process_events() does not change the input events. To perform processing in-place, use the method process_events_() instead.

plt.title("Input events")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_12_1.png
flip_y = metavision_sdk_core.FlipYAlgorithm(height - 1)
flip_y.process_events_(ev)
# input events have been processed in-place
plt.title("Events flipped vertically")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_13_1.png

Load hand-spinner sequence

Here is the link to download the RAW file used in this sample: spinner.raw

sequence_filename_raw = "spinner.raw"
assert os.path.isfile(sequence_filename_raw)

mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)

ev = mv_raw.load_delta_t(5000) # 5 ms

image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)

plt.title("Input events")
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_15_1.png

PolarityInverterAlgorithm

This is used to invert the polarity of events

polarity_inverter = metavision_sdk_core.PolarityInverterAlgorithm()
filtered_events_buf = polarity_inverter.get_empty_output_buffer()

polarity_inverter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Inverted polarity")
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_17_1.png

PolarityFilterAlgorithm

This is used to filter the events by polarity

polarity_filter = metavision_sdk_core.PolarityFilterAlgorithm(1)
filtered_events_buf = polarity_filter.get_empty_output_buffer() # keep only positive events

polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only positive events")
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_19_1.png
polarity_filter.polarity = 0 # keep only negative events
polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only negative events")
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_20_1.png

Load counting sequence and select a Region Of Interest

Here is the link to download the RAW file used in this sample: 80_balls.raw

sequence_filename_raw = "80_balls.raw"
assert os.path.isfile(sequence_filename_raw)

mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)

ev = mv_raw.load_n_events(3000) # discard first 3000 events

ev = mv_raw.load_n_events(3000) # process next 3000 events
ev["t"] -= ev["t"].min()

image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)

plt.title("Input events")
plt.imshow(image[:,:,::-1])
../../_images/metavision_sdk_core_bindings_22_1.png
roi_filter = metavision_sdk_core.RoiFilterAlgorithm(300, 0, 350, 50, output_relative_coordinates=True)
ev_roi_buffer = roi_filter.get_empty_output_buffer()
roi_filter.process_events(ev, ev_roi_buffer)
roi_width, roi_height = 51, 51

image_roi = np.zeros((roi_height, roi_width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev_roi_buffer.numpy(), image_roi)

plt.title("Input events in ROI")
plt.imshow(image_roi[:,:,::-1])
../../_images/metavision_sdk_core_bindings_23_1.png

TimeSurfaceProcessor

We can compute a Time-Surface on this chunk of events. It can be computed either by separating the positive and negative events, or by merging them in the same channel.

Separating positive and negative events

last_processed_timestamp = 0
time_surface_double_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_width,
                                                                            cols=roi_height,
                                                                            channels=2)

ts_prod_double_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmSplitPolarities(width=roi_width,
                                                                                         height=roi_height)
def callback_double_channel(ts, time_surface):
    global last_processed_timestamp
    global time_surface_double_channel
    last_processed_timestamp = ts
    time_surface_double_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_double_channel.set_output_callback(callback_double_channel)

ts_prod_double_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface positive events")
plt.imshow(time_surface_double_channel.numpy()[:,:,1])
../../_images/metavision_sdk_core_bindings_25_1.png
plt.title("TimeSurface negative events")
plt.imshow(time_surface_double_channel.numpy()[:,:,0])
../../_images/metavision_sdk_core_bindings_26_1.png

Merging positive and negative events

last_processed_timestamp = 0
time_surface_single_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_height,
                                                                            cols=roi_width,
                                                                            channels=1)

ts_prod_single_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmMergePolarities(width=roi_width,
                                                                                         height=roi_height)
def callback_single_channel(ts, time_surface):
    global last_processed_timestamp
    global time_surface_single_channel
    last_processed_timestamp = ts
    time_surface_single_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_single_channel.set_output_callback(callback_single_channel)

ts_prod_single_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface")
min_time = time_surface_single_channel.numpy()[time_surface_single_channel.numpy() != 0].min()
plt.imshow(time_surface_single_channel.numpy())
../../_images/metavision_sdk_core_bindings_28_1.png

Event preprocessing : EventPreprocessor

EventPreprocessor is a C++ implementation of the function described in the event preprocessing tutorial.

# the visualization functions from metavision_ml can be used.
from metavision_ml.preprocessing import viz_histo

We create an EventPreprocessor function with:

  • histo

  • delta_t = 50ms

  • network input size as a half of the size of the event_frame

mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
mv_raw.seek_time(3e6)

ev = mv_raw.load_delta_t(50000)

evt_preproc = metavision_sdk_core.EventPreprocessor.create_HistoProcessor(delta_t=50000,
                             network_input_width=width // 2,
                             network_input_height=height // 2,
                             event_input_width=width, event_input_height=height)

frame_buffer = evt_preproc.init_output_tensor()
evt_preproc.process_events(3000000, ev, frame_buffer)
print("frame_buffer shape        : ", frame_buffer.shape)
print("number of non-zero values : ", np.sum(frame_buffer != 0))
print("set of unique values      :\n", np.around(np.unique(frame_buffer), decimals=2))
plt.imshow(viz_histo(frame_buffer))
frame_buffer shape        :  (2, 240, 320)
number of non-zero values :  13572
set of unique values      :
 [0.   0.05 0.1  0.15 0.2  0.25 0.3  0.35 0.4  0.45 0.5  0.55 0.6  0.65
 0.7  0.75 0.8  0.85 0.9  0.95 1.  ]
tutorials/core/metavision_sdk_core_bindings_files/metavision_sdk_core_bindings_42_2.png
ev = mv_raw.load_delta_t(50000)

# reset and reuse the frame_buffer
frame_buffer.fill(0)
evt_preproc.process_events(3050000, ev, frame_buffer)
print("frame_buffer shape        : ", frame_buffer.shape)
print("number of non-zero values : ", np.sum(frame_buffer != 0))
print("set of unique values      :\n", np.around(np.unique(frame_buffer), decimals=2))
plt.imshow(viz_histo(frame_buffer))
frame_buffer shape        :  (2, 240, 320)
number of non-zero values :  13594
set of unique values      :
 [0.   0.05 0.1  0.15 0.2  0.25 0.3  0.35 0.4  0.45 0.5  0.55 0.6  0.65
 0.7  0.75 0.8  0.85 0.9  0.95 1.  ]
tutorials/core/metavision_sdk_core_bindings_files/metavision_sdk_core_bindings_43_2.png