SDK Core Python bindings
In this notebook, we describe some of the processes available in Python
in the module metavision_sdk_core
and present the results visually
import metavision_sdk_core
from metavision_sdk_base import EventCD
from metavision_core.event_io import RawReader
from metavision_sdk_core import BaseFrameGenerationAlgorithm
# if the file doesn't exist, it will be downloaded from Prophesee's public sample server
from metavision_core.utils import get_sample
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
plt.rcParams['figure.figsize'] = [11, 7]
import os
Load automotive sequence
sequence_filename_raw = "driving_sample.raw"
get_sample(sequence_filename_raw, folder=".")
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (1280, 720)
ev = mv_raw.load_delta_t(10000)
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf941f05f8>

ROIFilterAlgorithm
Output absolute coordinates (default)
Coordinates of output events are unchanged (expressed in the frame’s coordinate system)
roi_filter = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 1000, 450)
filtered_events_buffer = roi_filter.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD
roi_filter.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()
assert filtered_events_np["x"].min() >= 750
assert filtered_events_np["x"].max() <= 1000
assert filtered_events_np["y"].min() >= 250
assert filtered_events_np["y"].max() <= 450
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image)
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf940f64a8>

Output relative coordinates
Coordinates are expressed in the coordinate system of the ROI (top left of the ROI region is (0,0))
roi_filter_relative = metavision_sdk_core.RoiFilterAlgorithm(750, 250, 999, 449, output_relative_coordinates=True)
filtered_events_buffer = roi_filter_relative.get_empty_output_buffer()
assert filtered_events_buffer.numpy().dtype == EventCD
roi_filter_relative.process_events(ev, filtered_events_buffer)
filtered_events_np = filtered_events_buffer.numpy()
assert filtered_events_np["x"].min() >= 0
assert filtered_events_np["x"].max() < 250
assert filtered_events_np["y"].min() >= 0
assert filtered_events_np["y"].max() < 200
image_small = np.zeros((201, 251, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_np, image_small)
plt.imshow(image_small[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf940657b8>

FlipXAlgorithm / FlipYAlgorithm
flip_x = metavision_sdk_core.FlipXAlgorithm(width - 1)
flipped_events_buffer = flip_x.get_empty_output_buffer()
flip_x.process_events(ev, flipped_events_buffer)
plt.title("Events flipped horizontally")
BaseFrameGenerationAlgorithm.generate_frame(flipped_events_buffer.numpy(), image)
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf9403fc50>

Processing in-place
Calling process_events()
does not change the input events. To
perform processing in-place, use the method process_events_()
instead.
plt.title("Input events")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8ef86c88>

flip_y = metavision_sdk_core.FlipYAlgorithm(height - 1)
flip_y.process_events_(ev)
# input events have been processed in-place
plt.title("Events flipped vertically")
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8ef6fc50>

Load hand-spinner sequence
sequence_filename_raw = "spinner.raw"
get_sample(sequence_filename_raw, folder=".")
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)
ev = mv_raw.load_delta_t(5000) # 5 ms
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.title("Input events")
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8eec0ac8>

PolarityInverterAlgorithm
This is used to invert the polarity of events
polarity_inverter = metavision_sdk_core.PolarityInverterAlgorithm()
filtered_events_buf = polarity_inverter.get_empty_output_buffer()
polarity_inverter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Inverted polarity")
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf94132f60>

PolarityFilterAlgorithm
This is used to filter the events by polarity
polarity_filter = metavision_sdk_core.PolarityFilterAlgorithm(1)
filtered_events_buf = polarity_filter.get_empty_output_buffer() # keep only positive events
polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only positive events")
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf9419b668>

polarity_filter.polarity = 0 # keep only negative events
polarity_filter.process_events(ev, filtered_events_buf)
BaseFrameGenerationAlgorithm.generate_frame(filtered_events_buf.numpy(), image)
plt.title("Keep only negative events")
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8eefbc50>

Load counting sequence and select a Region Of Interest
sequence_filename_raw = "80_balls.raw"
get_sample(sequence_filename_raw, folder=".")
assert os.path.isfile(sequence_filename_raw)
mv_raw = RawReader(sequence_filename_raw)
height, width = mv_raw.get_size()
assert (width, height) == (640, 480)
ev = mv_raw.load_n_events(3000) # discard first 3000 events
ev = mv_raw.load_n_events(3000) # process next 3000 events
ev["t"] -= ev["t"].min()
image = np.zeros((height, width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev, image)
plt.title("Input events")
plt.imshow(image[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8eebd550>

roi_filter = metavision_sdk_core.RoiFilterAlgorithm(300, 0, 350, 50, output_relative_coordinates=True)
ev_roi_buffer = roi_filter.get_empty_output_buffer()
roi_filter.process_events(ev, ev_roi_buffer)
roi_width, roi_height = 51, 51
image_roi = np.zeros((roi_height, roi_width, 3), dtype=np.uint8)
BaseFrameGenerationAlgorithm.generate_frame(ev_roi_buffer.numpy(), image_roi)
plt.title("Input events in ROI")
plt.imshow(image_roi[:,:,::-1])
<matplotlib.image.AxesImage at 0x7faf8ee16a90>

TimeSurfaceProducerAlgorithm
We can compute a Time-Surface on this chunk of events. It can be computed either by separating the positive and negative events, or by merging them in the same channel.
Separating positive and negative events
last_processed_timestamp = 0
time_surface_double_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_width,
cols=roi_height,
channels=2)
ts_prod_double_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmSplitPolarities(width=roi_width,
height=roi_height)
def callback_double_channel(ts, time_surface):
global last_processed_timestamp
global time_surface_double_channel
last_processed_timestamp = ts
time_surface_double_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_double_channel.set_output_callback(callback_double_channel)
ts_prod_double_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface positive events")
plt.imshow(time_surface_double_channel.numpy()[:,:,1])
<matplotlib.image.AxesImage at 0x7faf8ed7e780>

plt.title("TimeSurface negative events")
plt.imshow(time_surface_double_channel.numpy()[:,:,0])
<matplotlib.image.AxesImage at 0x7faf8ed6b5c0>

Merging positive and negative events
last_processed_timestamp = 0
time_surface_single_channel = metavision_sdk_core.MostRecentTimestampBuffer(rows=roi_height,
cols=roi_width,
channels=1)
ts_prod_single_channel = metavision_sdk_core.TimeSurfaceProducerAlgorithmMergePolarities(width=roi_width,
height=roi_height)
def callback_single_channel(ts, time_surface):
global last_processed_timestamp
global time_surface_single_channel
last_processed_timestamp = ts
time_surface_single_channel.numpy()[...] = time_surface.numpy()[...]
ts_prod_single_channel.set_output_callback(callback_single_channel)
ts_prod_single_channel.process_events(ev_roi_buffer)
plt.title("TimeSurface")
min_time = time_surface_single_channel.numpy()[time_surface_single_channel.numpy() != 0].min()
plt.imshow(time_surface_single_channel.numpy())
<matplotlib.image.AxesImage at 0x7faf8ed23f98>
