Detection and Tracking Tutorial

In this tutorial, we learn how to use a pre-trained network for the detection and tracking of cars and pedestrians. At the end of this tutorial, we will have a full pipeline that takes events in input and outputs the position and id of cars and pedestrians in the field of view. Note that we also provide a detection and tracking sample.

The dataflow of the events is the following:

  • Read the events from an event-based camera or a RAW/DAT file

  • Preprocess the events

  • Two parallel branches: one for ML detection and another for the data association:

    • Runtime inference to extract the Detection from preprocessed events

    • Noise Filtering (STC, Trail…) + Data association

  • Merge all the results and display them

This is the pipeline:

DT Pipeline

Let’s start by loading the required libraries and some data:

import os
import cv2
import numpy as np

import torch

# Import of Metavision Machine Learning binding
import metavision_sdk_ml
import metavision_sdk_cv
from metavision_sdk_core import EventBbox

Here is the link to download the RAW file used in this sample: driving_sample.raw

SEQUENCE_FILENAME_RAW = "driving_sample.raw"
assert os.path.isfile(SEQUENCE_FILENAME_RAW)

Pipeline Components

Event Producer

The event producer generates a stream of event to feed the pipeline.

We use an object metavision_core.event_io.EventsIterator to produce the stream of events. We choose to process the data 10ms at a time as this gives a good tradeoff between low latency and performance.

from metavision_core.event_io import EventsIterator

DELTA_T = 10000  # 10 ms

def init_event_producer():
    return EventsIterator(SEQUENCE_FILENAME_RAW, start_ts=0, delta_t=DELTA_T, relative_timestamps=False)

#initialize an iterator to get the sensor size
mv_it = init_event_producer()
ev_height, ev_width = mv_it.get_size()

print("Dimensions:", ev_width, ev_height)

  Dimensions: 1280 720

Detection

The next block to build is the detection block. The detection is done using a neural network inference based on PyTorch. The provided model is a Torch model saved using jit.save().

You can use this model directly with Torch, but our metavision_ml.detection_tracking.object_detector.ObjectDetector class provides additional features, such as result extraction and parsing, thresholding, non-minima suppression, and more. First, we need a pre-trained TorchScript model with a JSON file of hyperparameters. Check our pre-trained models page to find out how to download the object detection TorchScript model. Move the folder red_event_cube_05_2020 in your local folder or update the path in the following code.

Now, we need to load the model and define some parameters.

NN_MODEL_DIRECTORY = os.path.abspath(os.path.join(os.getcwd(), "red_event_cube_05_2020"))

# check whether we can use the GPU or we should fall back on the CPU
DEVICE = "cpu"  # "cpu", "cuda" (or "cuda:0", "cuda:1", etc.)
if torch.cuda.is_available():
    DEVICE = "cuda"

NN_DOWNSCALE_FACTOR = 2 # divide events input height and width by this factor before applying NN, this gives us a good trade-off between accuracy and performance
DETECTOR_SCORE_THRESHOLD = 0.4 # ignore all detections below this threshold
NMS_IOU_THRESHOLD = 0.4 # apply Non-Maximum Suppression when the intersection over union (IOU) is above this threshold

Now, let’s load the model and create an ObjectDetector

from metavision_ml.detection_tracking import ObjectDetector

network_input_width  = ev_width  // NN_DOWNSCALE_FACTOR
network_input_height = ev_height // NN_DOWNSCALE_FACTOR

object_detector = ObjectDetector(NN_MODEL_DIRECTORY,
                                 events_input_width=ev_width,
                                 events_input_height=ev_height,
                                 runtime=DEVICE,
                                 network_input_width=network_input_width,
                                 network_input_height=network_input_height)
object_detector.set_detection_threshold(DETECTOR_SCORE_THRESHOLD)
object_detector.set_iou_threshold(NMS_IOU_THRESHOLD)

Now that we have a detector object, we need to setup a function to use it. Note how, in this case, the object detector has its own data preprocessing component, which we can get with the get_cd_processor() function. For this reason, we do not need to use other preprocessing tools. It is possible to load the model directly using the Torch functions. In that case, it would be required to prepare the data in input using our preprocessing tools, as presented in the tutorial Event Preprocessing. This approach using directly Torch functions is presented in the tutorial Reusing a Torchjit Model.

Let’s get the preprocessing component of our object detector.

cdproc = object_detector.get_cd_processor()
frame_buffer = cdproc.init_output_tensor()
print("frame_buffer.shape: ", frame_buffer.shape)
assert frame_buffer.shape == (10, network_input_height, network_input_width)
assert (frame_buffer == 0).all()

frame_buffer.shape:  (2, 360, 640)

Finally, we can define the detection function. At each iteration we process the events to prepare them for the network.This is done with the cdproc.process_events() function which builds an input tensor incrementally before it is fed to the neural network. current_frame_start_ts is used to keep track of the starting timestamp of the current frame_buffer.

The call to the object_detector.process() is not done at each iteration of 10ms.It is done at a frequency that is defined for each model at training time. This can be obtained with the function get_accumulation_time().

NN_accumulation_time = object_detector.get_accumulation_time()
def generate_detection(ts, ev):
    current_frame_start_ts = ((ts-1) // NN_accumulation_time) * NN_accumulation_time
    cdproc.process_events(current_frame_start_ts, ev, frame_buffer)

    detections = np.empty(0, dtype=EventBbox)

    if ts % NN_accumulation_time == 0:  # call the network only when defined
        # call neural network to detect objects
        detections = object_detector.process(ts, frame_buffer)
        # reset neural network input frame
        frame_buffer.fill(0)
    return detections

At this point, we have the building blocks to read the events, preprocess them, and run the detector to obtain the position of cars and pedestrians. The information obtained from the network only refers to the particular batch of events we passed to the detector.If we want to associate an ID to each object we detected and track them over time, we need to use the DataAssociation class.We will see how to do it in the next sections.

Noise Filtering

The data association component works better if we first remove all “noisy” events.We can do this in many different ways, but in this tutorial, we will use the TrailFilterAlgorithm.This filter works in the following way: for each pixel, we only keep the first event received, all subsequent events received between the start of the event batch and a temporal threshold defined by the user are discarded.

TRAIL_THRESHOLD=10000
trail = metavision_sdk_cv.TrailFilterAlgorithm(width=ev_width, height=ev_height, threshold=TRAIL_THRESHOLD)

We can now create the building block for filtering the events.

# Filter done after the detection
ev_filtered_buffer = trail.get_empty_output_buffer()
def noise_filter(ev):
    # apply trail filter
    trail.process_events(ev, ev_filtered_buffer)
    return ev_filtered_buffer.numpy()

Data Association

Now that we have the correct input for the data association block, we can initialize it like this:

def init_data_association():
    return metavision_sdk_ml.DataAssociation(width=ev_width, height=ev_height, max_iou_inter_track=0.3)

At this point, we have all the building blocks for detection and tracking. What we are missing is a way to visualize the results.

Video Output Generation

First, let’s enable the visualization (you can set an environment variable if you want to run this procedurally without display) and the output path.

DO_DISPLAY = True and os.environ.get("DOC_DISPLAY", 'ON') != "OFF" # display the result in a window
OUTPUT_VIDEO = ""  # output video (disabled if the string is empty. Set a file name to save the video)

Video generation is done using scikit-video, which provides a wrapper around the FFmpeg library.

import numpy as np
# Temporary solution to fix the numpy deprecated alias in skvideo: https://github.com/scikit-video/scikit-video/issues/154#issuecomment-1445239790
# Will be deleted in MV-2134 when skvideo makes the correction
np.float = np.float64
np.int = np.int_
from skvideo.io import FFmpegWriter

Now, we can create the initialization function for the visualization building block:

def init_output():
    if OUTPUT_VIDEO:
        assert OUTPUT_VIDEO.lower().endswith(".mp4"), "Video should be mp4"

    if DO_DISPLAY:
        cv2.namedWindow("Detection and Tracking", cv2.WINDOW_NORMAL)

    return FFmpegWriter(OUTPUT_VIDEO) if OUTPUT_VIDEO else None

if OUTPUT_VIDEO or DO_DISPLAY:
    frame = np.zeros((ev_height, ev_width * 2, 3), dtype=np.uint8)

Note how we initialized the first frame as an empty Numpy array of the same size as the expected output.

Finally, we can create the visualization building block. We use an internal function called draw_detections_and_tracklets which takes in input the detections and the tracklets (which are the output of the data association block) and creates a clean visualization.

As you can see below, we first create the output with the draw_detections_and_tracklets function, and then we optionally display it or store it in a video (or both).

from metavision_ml.detection_tracking import draw_detections_and_tracklets
from metavision_sdk_core import BaseFrameGenerationAlgorithm

def generate_display(ts, ev, detections, tracklets, process_video):
    if OUTPUT_VIDEO or DO_DISPLAY:
        # build image frame
        BaseFrameGenerationAlgorithm.generate_frame(ev, frame[:, :ev_width])
        frame[:, ev_width:] = frame[:, :ev_width]
        draw_detections_and_tracklets(ts=ts, frame=frame, width=ev_width, height=ev_height,
                                         detections=detections, tracklets=tracklets)

    if DO_DISPLAY:
        # display image on screen
        cv2.imshow('Detection and Tracking', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            return False

    if OUTPUT_VIDEO:
        # write video
        process_video.writeFrame(frame[...,::-1].astype(np.uint8))

    return True

def end_display(process_video):
    # close video and window
    if OUTPUT_VIDEO:
        process_video.close()

    if DO_DISPLAY:
        cv2.destroyAllWindows()

Creating the Final Pipeline

Now that we have all the building blocks, we can instantiate the pipeline and execute it:

mv_it = init_event_producer() # initialize the iterator to read the events
object_detector.reset() # reset the object detector internal memory before processing a sequence
data_assoc = init_data_association() #  initialize the data association block
data_assoc_buffer = data_assoc.get_empty_output_buffer()
process_video = init_output() # initialize the video generation

END_TS = 10 * 1e6 # process sequence until this timestamp (None to disable)

for ev in mv_it:
    ts = mv_it.get_current_time()

    if END_TS and ts > END_TS:
        break

    # run the detectors and get the output
    detections = generate_detection(ts, ev)

    # remove noisy events for processing with the data association block
    noise_filtered_ev = noise_filter(ev)

    # compute tracklets
    data_assoc.process_events(ts, noise_filtered_ev, detections, data_assoc_buffer)
    tracklets = data_assoc_buffer.numpy()

    if not generate_display(ts, ev, detections, tracklets, process_video):
        # if the generation is stopped using `q`, break the loop
        break

# finalize the recording
end_display(process_video)

Reloading Detections from a Text File

In this section, we demonstrate how the same pipeline can be used by gathering detected boxes from an external source. Here, we use the results from a previous run of our pipeline, where we stored the output in a CSV file, but any external detector could be used.

The same process could be used with any external object detector or ground truth, as long as the prediction bounding boxes are converted into a numpy array of metavision_sdk_core.EventBbox.dtype.

Here is the link to download the file used in this tutorial: driving_sample_detections.txt

First, let’s load the detections from a CSV file using the detections_csv_loader() function:

import os
from metavision_ml.detection_tracking import detections_csv_loader

offline_detections = "driving_sample_detections.txt"

NN_ACCUMULATION_TIME = 50000

dic_ts_eventbbox = detections_csv_loader(offline_detections)

Now we can create a function that loads the detection iteratively at the correct timestamp:

def load_detection(ts, ev):
    detections = np.empty(0, dtype=EventBbox)
    if ts % NN_accumulation_time == 0:
        if ts in dic_ts_eventbbox:
            detections = dic_ts_eventbbox[ts]
    return detections

Finally, here is the new pipeline. This pipeline is the same as the one presented in the previous section, but with the detector replaced with our load_detection function.

mv_it = init_event_producer() # initialize the iterator to read the events
data_assoc = init_data_association() #  initialize the data association block
data_assoc_buffer = data_assoc.get_empty_output_buffer()
process_video = init_output() # initialize the video generation

END_TS = 2 * 1e6 # process sequence until this timestamp (None to disable)

for ev in mv_it:
    ts = mv_it.get_current_time()
    if END_TS and ts > END_TS:
        break

    # load the precomputed detections
    detections = load_detection(ts, ev)

    # remove noisy events for processing with the data association block
    noise_filtered_ev = noise_filter(ev)

    # compute tracklets
    data_assoc.process_events(ts, noise_filtered_ev, detections, data_assoc_buffer)
    tracklets = data_assoc_buffer.numpy()

    if not generate_display(ts, ev, detections, tracklets, process_video):
        # if the generation is stopped using `q`, break the loop
        break

end_display(process_video)