Tutorial 7: Get the events with the Python Consumer

In this tutorial, we will see how to get access to the events for direct processing in Python.

All the components presented up to here were processing the received events internally, without allowing you to get access to the events. This is the main philosophy of Metavision Designer: fast prototyping of event-based pipelines. However, sometimes it is useful to get access to the events to do some simple direct processing. This is possible using the Metavision Designer class PythonConsumer.

Let’s see how.

First, we need to import the basic libraries for processing. Note that, in this tutorial, we use OpenCV to visualize the frames. For this reason, you need to have the opencv-python library installed.

from os import path
import sys
import metavision_designer_engine as mvd_engine
import metavision_designer_core as mvd_core
import metavision_hal as mv_hal

try:
    import cv2
except BaseException:
    raise RuntimeError('To run this sample, you need to install the opencv-python library')

The processing class

The PythonConsumer class works using callbacks. The idea is the following: we define a function that is automatically called by the controller every time new events are available.

The first step is to create this callback function. For convenience and to store information between separate calls, it is common to create a class to which this function belongs.

Let’s define an EventProcessor class:

class EventProcessor:
    """
    Simple wrapper around Python Consumer
    """

    def __init__(self, event_gen_name, frame_gen_name):
        """
        Constructor
        """
        self.__event_gen_name = event_gen_name
        self.__frame_gen_name = frame_gen_name
        self.__frame = None

    def draw_frame(self):
        """
        Called from main thread to Display frame
        """
        cv2.imshow('Events Display OpenCV', self.__frame)
        cv2.waitKey(1)   # 1 ms to yield ui

    def event_callback(self, ts, src_events, src_2d_arrays):
        """
        Python Callback for PythonConsumer component in a
        Metavision Designer pipeline
        """
        if self.__event_gen_name in src_events:
            # the shape of the frame in the analog buffer
            event_type = src_events[self.__event_gen_name][0]
            # the encoding information. These are just for information, as the data in src_events[2]
            # is already decoded
            dtype_obj = src_events[self.__event_gen_name][1]
            # the actual event buffer data
            event_buffer = src_events[self.__event_gen_name][2]
            # get the number of events in this callback
            if len(event_buffer)!=0:
                print ("This callback contains {} events. The first event is {}".format(len(event_buffer), event_buffer[0]))

        if self.__frame_gen_name in src_2d_arrays:
            # the shape of the frame in the analog buffer
            buffer_shape = src_2d_arrays[self.__frame_gen_name][0]
            # the encoding information. These are just for information, as the data in src_2d_arrays[2]
            # is already decoded
            dtype_obj = src_2d_arrays[self.__frame_gen_name][1]
            # the actual analog buffer data
            frame_buffer = src_2d_arrays[self.__frame_gen_name][2]
            # convert the frame data into a format compatible with OpenCV
            self.__frame = frame_buffer.squeeze()

This class is composed of three functions: a constructor (__init__), the visualization function (draw_frame), and the callback function (event_callback).

The __init__ constructor stores two strings: the name of an event generator and the name of a frame generator. These names will be used to distinguish how to process the received data, in case there are multiple sources of input.

The draw_frame function simply displays the information stored in the __frame variable using OpenCV’s imshow.

The event_callback function is the function we will associate to the PythonConsumer for event processing. Every time a new buffer of events is available, this function will be called. This function has three parameters:

  • ts: this is the timestamp of the end of the buffer. All events included in this callback will have a timestamp strictly lower than ts;

  • src_events: is a dictionary containing a list for each component associated with the PythonConsumer. The label of each item in the dictionary is the name passed when adding a new source to the PythonConsumer. Each list is composed like this:

    • [0] contains the type of the events contained in this buffer. E.g. Event2d;

    • [1] contains the Numpy dtype information: the size in bits of the event, the name of each field, the format of each field, and their offset. Note that the events are already decoded, so this information can be ignored;

    • [2] contains the array with the decoded events in this format (x, y, polarity, timestamp);

  • src_2d_arrays: contains similar information as the previous field. The composition of the list is slightly different:

    • [0] contains the array dimensions in the following format [width, height, channels];

    • [1] contains the Numpy dtype information of the pixel information;

    • [2] contains the 2D array data

As you can see, in this function, we simply count the number of events we received in the src_events data, printing the first event received, and we store the frame information for visualization.

Let’s now see the rest of the pipeline:

Initialization

As usual, let’s create the infrastructure to load the file and read the data (remember to change the path):

input_filename = "PATH_TO_DAT"
#input_filename = "PATH_TO_RAW"

# Check validity of input arguments
if not(path.exists(input_filename) and path.isfile(input_filename)):
    print("Error: provided input path '{}' does not exist or is not a file.".format(input_filename))
    sys.exit(1)

is_raw = input_filename.endswith('.raw')
is_dat = input_filename.endswith('.dat')

if not (is_raw or is_dat):
    print("Error: provided input path '{}' does not have the right extension. ".format(input_filename) +
            "It has either to be a .raw or a .dat file")
    sys.exit(1)

Building the pipeline

We will now create the pipeline by adding the different components to the controller.

Most of the pipeline is similar to the simple pipelines we have implemented in the previous tutorials:

controller = mvd_engine.Controller()

if is_dat:
    cd_producer = mvd_core.FileProducer(input_filename)
else:
    device = mv_hal.DeviceDiscovery.open_raw_file(input_filename)
    if not device:
        print("Error: could not open file '{}'.".format(input_filename))
        sys.exit(1)

    # Add the device interface to the pipeline
    interface = mvd_core.HalDeviceInterface(device)
    controller.add_device_interface(interface)

    cd_producer = mvd_core.CdProducer(interface)

    # Start the streaming of events
    i_events_stream = device.get_i_events_stream()
    i_events_stream.start()

# Add cd_producer to the pipeline
controller.add_component(cd_producer, "CD Producer")

# Create Frame Generator with 20ms accumulation time
frame_gen = mvd_core.FrameGenerator(cd_producer)
frame_gen.set_dt(20000)
controller.add_component(frame_gen, "FrameGenerator")

The main difference is the use of the PythonConsumer

# We use PythonConsumer to "grab" the output of two components: cd_producer and frame_gen
# pyconsumer will callback the application each time it receives data, using the event_callback function
frame_gen_name = "FrameGen"
cd_prod_name = "CDProd"
ev_proc = EventProcessor(event_gen_name=cd_prod_name, frame_gen_name=frame_gen_name)

pyconsumer = mvd_core.PythonConsumer(ev_proc.event_callback)
pyconsumer.add_source(cd_producer, cd_prod_name)
pyconsumer.add_source(frame_gen, frame_gen_name)
controller.add_component(pyconsumer, "PythonConsumer")

Let’s analyze this part.

First, we instantiate the EventProcessor class we created earlier and we pass two strings, one for the FrameGenerator and one for the CDProducer:

ev_disp = EventProcessor(event_gen_name=cd_prod_name, frame_gen_name=frame_gen_name)

Then we instantiate the PythonConsumer class and pass the function that will be used for the callback:

pyconsumer = mvd_core.PythonConsumer(ev_disp.event_callback)

We now have to add the sources to the PythonConsumer. Note how we need to use the same names specified before to ensure that we will be able to process the data correctly:

pyconsumer.add_source(cd_producer, cd_prod_name)
pyconsumer.add_source(frame_gen, frame_gen_name)

Finally, we add the PythonConsumer to the controller as with any other component:

controller.add_component(pyconsumer, "PythonConsumer")

The only missing part is now the main controller loop. Note how we manually draw the frame using the EventProcessor custom class.

controller.set_slice_duration(10000)
controller.set_batch_duration(40000)
do_sync = True

# Run pipeline & print execution statistics
while not controller.is_done():

    controller.run(do_sync)

    # Render frame
    ev_proc.draw_frame()

Output

The expected output is the following:

And this is the expected console output

...
This callback contains 25653 events. The first event is (218, 312, 1, 210000)
This callback contains 25453 events. The first event is (236, 351, 1, 220000)
This callback contains 25733 events. The first event is (239, 246, 0, 230000)
This callback contains 25926 events. The first event is (326, 265, 1, 240000)
This callback contains 26495 events. The first event is (291, 309, 0, 250000)
This callback contains 26584 events. The first event is (197, 306, 0, 260000)
This callback contains 26820 events. The first event is (171, 301, 1, 270000)
This callback contains 27072 events. The first event is (214, 323, 1, 280000)
...

We can close the window to conclude the run.

cv2.destroyAllWindows()

Note

The PythonConsumer class allows you to do custom processing with events using Metavision Designer. However, the output of this processing cannot be re-fed into a Metavision Designer pipeline. In other words, the output of a PythonConsumer cannot be used as an input for another Designer component.

Currently, it is not possible to write your own custom components for Metavision Designer. If you want to write your own custom processing, you should use the Metavision SDK

Note

This tutorial was created using Jupiter Notebooks

Download the source code.