SDK Core Python bindings
In this tutorial, we describe some of the processes available in Python
in the module metavision_sdk_core
and present the results visually
import metavision_sdk_core
from metavision_sdk_base import EventCD
from metavision_core.event_io import RawReader
from metavision_sdk_core import BaseFrameGenerationAlgorithm
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
plt.rcParams['figure.figsize'] = [11, 7]
import os
Load automotive sequence
Here is the link to download the RAW file used in this sample: driving_sample.raw
sequence_filename_raw = "driving_sample.raw"
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (1280, 720)
ev = mv_raw.load_delta_t(10000)
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
ROIFilterAlgorithm
Output absolute coordinates (default)
Coordinates of output events are unchanged (expressed in the frame’s coordinate system)
roi_filter = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 1000, 450)
filtered_events_buffer = roi_filter.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD
roi_filter.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()
assert filtered_events_np["x"].min() >= 750
assert filtered_events_np["x"].max() <= 1000
assert filtered_events_np["y"].min() >= 250
assert filtered_events_np["y"].max() <= 450
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image)
plt.imshow(image[:,:,::-1])
Output relative coordinates
Coordinates are expressed in the coordinate system of the ROI (top left of the ROI region is (0,0))
roi_filter_relative = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 999, 449, output_relative_coordinates=True)
filtered_events_buffer = roi_filter_relative.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD
roi_filter_relative.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()
assert filtered_events_np["x"].min() >= 0
assert filtered_events_np["x"].max() < 250
assert filtered_events_np["y"].min() >= 0
assert filtered_events_np["y"].max() < 200
image_small = np.zeros((201, 251, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image_small)
plt.imshow(image_small[:,:,::-1])
FlipXAlgorithm / FlipYAlgorithm
flip_x = metavision_sdk_core.FlipXAlgorithm(width - 1)
flipped_events_buffer = flip_x.get_empty_output_buffer()
flip_x.process_events(ev, flipped_events_buffer)
plt.title("Events flipped horizontally")
BaseFrameGenerationAlgorithm.generate_frame(flipped_events_buffer.numpy(), image)
plt.imshow(image[:,:,::-1])
Processing in-place
Calling process_events()
does not change the input events. To
perform processing in-place, use the method process_events_()
instead.
plt.title("Input events")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
flip_y = metavision_sdk_core.FlipYAlgorithm(height - 1)
flip_y.process_events_(ev)
# input events have been processed in-place
plt.title("Events flipped vertically")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
Load hand-spinner sequence
Here is the link to download the RAW file used in this sample: spinner.raw
sequence_filename_raw = "spinner.raw"
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)
ev = mv_raw.load_delta_t(5000) # 5 ms
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.title("Input events")
plt.imshow(image[:,:,::-1])
PolarityInverterAlgorithm
This is used to invert the polarity of events
polarity_inverter = metavision_sdk_core.PolarityInverterAlgorithm()
filtered_events_buf = polarity_inverter.get_empty_output_buffer()
polarity_inverter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Inverted polarity")
plt.imshow(image[:,:,::-1])
PolarityFilterAlgorithm
This is used to filter the events by polarity
polarity_filter = metavision_sdk_core.PolarityFilterAlgorithm(1)
filtered_events_buf = polarity_filter.get_empty_output_buffer() # keep only positive events
polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only positive events")
plt.imshow(image[:,:,::-1])
polarity_filter.polarity = 0 # keep only negative events
polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only negative events")
plt.imshow(image[:,:,::-1])
Load counting sequence and select a Region Of Interest
Here is the link to download the RAW file used in this sample: 80_balls.raw
sequence_filename_raw = "80_balls.raw"
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)
ev = mv_raw.load_n_events(3000) # discard first 3000 events
ev = mv_raw.load_n_events(3000) # process next 3000 events
ev["t"] -= ev["t"].min()
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.title("Input events")
plt.imshow(image[:,:,::-1])
roi_filter = metavision_sdk_core.RoiFilterAlgorithm(300, 0, 350, 50, output_relative_coordinates=True)
ev_roi_buffer = roi_filter.get_empty_output_buffer()
roi_filter.process_events(ev, ev_roi_buffer)
roi_width, roi_height = 51, 51
image_roi = np.zeros((roi_height, roi_width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev_roi_buffer.numpy(), image_roi)
plt.title("Input events in ROI")
plt.imshow(image_roi[:,:,::-1])
TimeSurfaceProcessor
We can compute a Time-Surface on this chunk of events. It can be computed either by separating the positive and negative events, or by merging them in the same channel.
Separating positive and negative events
last_processed_timestamp = 0
time_surface_double_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_width,
cols=roi_height,
channels=2)
ts_prod_double_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmSplitPolarities(width=roi_width,
height=roi_height)
def callback_double_channel(ts, time_surface):
global last_processed_timestamp
global time_surface_double_channel
last_processed_timestamp = ts
time_surface_double_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_double_channel.set_output_callback(callback_double_channel)
ts_prod_double_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface positive events")
plt.imshow(time_surface_double_channel.numpy()[:,:,1])
plt.title("TimeSurface negative events")
plt.imshow(time_surface_double_channel.numpy()[:,:,0])
Merging positive and negative events
last_processed_timestamp = 0
time_surface_single_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_height,
cols=roi_width,
channels=1)
ts_prod_single_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmMergePolarities(width=roi_width,
height=roi_height)
def callback_single_channel(ts, time_surface):
global last_processed_timestamp
global time_surface_single_channel
last_processed_timestamp = ts
time_surface_single_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_single_channel.set_output_callback(callback_single_channel)
ts_prod_single_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface")
min_time = time_surface_single_channel.numpy()[time_surface_single_channel.numpy() != 0].min()
plt.imshow(time_surface_single_channel.numpy())
Event preprocessing : EventPreprocessor
EventPreprocessor is a C++ implementation of the function described in the event preprocessing tutorial.
# the visualization functions from metavision_ml can be used.
from metavision_ml.preprocessing import viz_histo
We create an EventPreprocessor
function with:
histo
delta_t = 50ms
network input size as a half of the size of the event_frame
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
mv_raw.seek_time(3e6)
ev = mv_raw.load_delta_t(50000)
evt_preproc = metavision_sdk_core.EventPreprocessor.create_HistoProcessor(delta_t=50000,
network_input_width=width // 2,
network_input_height=height // 2,
event_input_width=width, event_input_height=height)
frame_buffer = evt_preproc.init_output_tensor()
evt_preproc.process_events(3000000, ev, frame_buffer)
print("frame_buffer shape : ", frame_buffer.shape)
print("number of non-zero values : ", np.sum(frame_buffer != 0))
print("set of unique values :\n", np.around(np.unique(frame_buffer), decimals=2))
plt.imshow(viz_histo(frame_buffer))
frame_buffer shape : (2, 240, 320)
number of non-zero values : 13572
set of unique values :
[0. 0.05 0.1 0.15 0.2 0.25 0.3 0.35 0.4 0.45 0.5 0.55 0.6 0.65
0.7 0.75 0.8 0.85 0.9 0.95 1. ]
ev = mv_raw.load_delta_t(50000)
# reset and reuse the frame_buffer
frame_buffer.fill(0)
evt_preproc.process_events(3050000, ev, frame_buffer)
print("frame_buffer shape : ", frame_buffer.shape)
print("number of non-zero values : ", np.sum(frame_buffer != 0))
print("set of unique values :\n", np.around(np.unique(frame_buffer), decimals=2))
plt.imshow(viz_histo(frame_buffer))
frame_buffer shape : (2, 240, 320)
number of non-zero values : 13594
set of unique values :
[0. 0.05 0.1 0.15 0.2 0.25 0.3 0.35 0.4 0.45 0.5 0.55 0.6 0.65
0.7 0.75 0.8 0.85 0.9 0.95 1. ]