Example of Building a Pipeline with a Custom Stage

The code sample Metavision CSV Viewer found in the Core module will be used here to show how to build a pipeline with a custom stage.

The goal of this pipeline is to read data from a CSV file (for example generated using the File to CSV sample) and visualize the data on screen.

The pipeline can be represented by this graph:

../../_images/pipeline_csv_viewer.jpg

Implementing a Custom Processing Stage

This sample defines its own processing stage, CSVReadingStage, that reads events from CSV file:

 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class CSVReadingStage : public Metavision::BaseStage {
public:
    CSVReadingStage(const std::string &filename) : ifs_(filename) {
        if (!ifs_.is_open()) {
            MV_LOG_ERROR() << "Unable to open " << filename;
            throw std::runtime_error("Unable to open " + filename);
        }
        if (!parse_csv_header()) {
            MV_LOG_ERROR() << "Error while parsing header of " << filename;
            throw std::runtime_error("Error while parsing header of " + filename);
        }

        cur_cd_buffer_ = cd_buffer_pool_.acquire();
        cur_cd_buffer_->clear();

        // this callback is called once the pipeline is started, so the stage knows it can start producing
        set_starting_callback([this]() {
            done_           = false;
            reading_thread_ = std::thread([this] { read(); });
        });

        // this callback is called once the pipeline is stopped : it can be initiated by a call
        // to @ref Pipeline::cancel() and/or after all stages are done and all task queues have been cleared
        set_stopping_callback([this]() {
            done_ = true;
            if (reading_thread_.joinable())
                reading_thread_.join();
        });
    }

    /// [PIPELINE_USAGE_READ_BEGIN]
    void read() {
        std::string line;
        while (!done_ && std::getline(ifs_, line)) {
            // once here, we know that the stage has not been stopped yet,
            // so we read a line and may produce a buffer
            std::istringstream iss(line);
            std::vector<std::string> tokens;
            std::string token;
            while (std::getline(iss, token, ',')) {
                tokens.push_back(token);
            }
            if (tokens.size() != 4) {
                MV_LOG_ERROR() << "Invalid line : <" << line << "> ignored";
            } else {
                if (cur_cd_buffer_->size() > 5000) {
                    // this is how a stage produces data to be consumed by the next stages
                    produce(cur_cd_buffer_);
                    cur_cd_buffer_ = cd_buffer_pool_.acquire();
                    cur_cd_buffer_->clear();
                }
                cur_cd_buffer_->emplace_back(static_cast<unsigned short>(std::stoul(tokens[0])),
                                             static_cast<unsigned short>(std::stoul(tokens[1])),
                                             static_cast<short>(std::stoi(tokens[2])), std::stoll(tokens[3]));
            }
        }
        // notifies to the pipeline that this producer has no more data to produce
        complete();
    }
    /// [PIPELINE_USAGE_READ_END]

    std::optional<int> get_width() const {
        return width_;
    }
    std::optional<int> get_height() const {
        return height_;
    }

private:
    bool read_cd_csv_header_line() {
        std::string line;
        if (std::getline(ifs_, line)) {
            std::istringstream iss(line);
            std::string key, value;
            std::vector<std::string> values;
            iss.ignore(1); // ignore leading '%'
            std::getline(iss, key, ':');
            while (std::getline(iss, value, ',')) {
                values.push_back(value);
            }
            if (key == "geometry") {
                if (values.size() == 2) {
                    width_  = std::stoi(values[0]);
                    height_ = std::stoi(values[1]);
                } else {
                    MV_LOG_ERROR() << "Ignoring invalid header line for key geometry, expected "
                                      "\"%geometry:<width>,<height>\", got: \""
                                   << line << "\"";
                }
            }
        }
        return ifs_.good();
    }

    bool parse_csv_header() {
        while (ifs_.peek() == '%') {
            if (!read_cd_csv_header_line()) {
                return false;
            }
        }
        return true;
    }

    std::optional<int> width_, height_;
    std::atomic<bool> done_;
    std::thread reading_thread_;
    std::ifstream ifs_;
    EventBufferPool cd_buffer_pool_;
    EventBufferPtr cur_cd_buffer_;
};

Let’s analyze the code.

First, let’s create a class that derives from BaseStage. This class will be the stage in charge of reading events from the CSV file and producing the events for the rest of the pipeline.

Here’s the beginning of the constructor:

36
public:

The constructor for CSVReadingStage calls the default constructor for BaseStage, BaseStage::BaseStage(bool detachable = true). detachable is set to true means that BaseStage::detach() can be called on this stage, which will make the pipeline use a dedicated thread for this stage. The pipeline will use this thread for calling the stage’s callbacks. If, on the other hand, you want your stage to run in the main thread, you can set this value to false, to declare that it is not detachable. This is useful for some processing, such as interfacing with some GUIs.

In the next lines the constructor makes sure the file is valid and open:

37
38
39
    CSVReadingStage(const std::string &filename) : ifs_(filename) {
        if (!ifs_.is_open()) {
            MV_LOG_ERROR() << "Unable to open " << filename;

Next, it creates a buffer used to store the events. In particular, it acquires the buffer from an object pool, BoundedSharedObjectPool. For an explanation of the object pool pattern, refer to the Wikipedia page. In particular, our implementation returns smart pointers which return the resource to the pool when the pointer goes out of scope. This buffer will contain the read events from the CSV file.

41
42
        }
        if (!parse_csv_header()) {

Note that there are four possible types of callbacks that a stage can have:

In our stage, we only need the starting and stopping callbacks which are used for managing a thread used by our CSVReadingStage for reading the input file:

44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
            throw std::runtime_error("Error while parsing header of " + filename);
        }

        cur_cd_buffer_ = cd_buffer_pool_.acquire();
        cur_cd_buffer_->clear();

        // this callback is called once the pipeline is started, so the stage knows it can start producing
        set_starting_callback([this]() {
            done_           = false;
            reading_thread_ = std::thread([this] { read(); });
        });

        // this callback is called once the pipeline is stopped : it can be initiated by a call
        // to @ref Pipeline::cancel() and/or after all stages are done and all task queues have been cleared
        set_stopping_callback([this]() {

Next, CSVReadingStage::read() is defined. This function is called from the reading_thread_ thread, in order to read the content of the file and “produce” data to be fed to the next stages.

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
    void read() {
        std::string line;
        while (!done_ && std::getline(ifs_, line)) {
            // once here, we know that the stage has not been stopped yet,
            // so we read a line and may produce a buffer
            std::istringstream iss(line);
            std::vector<std::string> tokens;
            std::string token;
            while (std::getline(iss, token, ',')) {
                tokens.push_back(token);
            }
            if (tokens.size() != 4) {
                MV_LOG_ERROR() << "Invalid line : <" << line << "> ignored";
            } else {
                if (cur_cd_buffer_->size() > 5000) {
                    // this is how a stage produces data to be consumed by the next stages
                    produce(cur_cd_buffer_);
                    cur_cd_buffer_ = cd_buffer_pool_.acquire();
                    cur_cd_buffer_->clear();
                }
                cur_cd_buffer_->emplace_back(static_cast<unsigned short>(std::stoul(tokens[0])),
                                             static_cast<unsigned short>(std::stoul(tokens[1])),
                                             static_cast<short>(std::stoi(tokens[2])), std::stoll(tokens[3]));
            }
        }
        // notifies to the pipeline that this producer has no more data to produce
        complete();
    }

As you can see, this function consists of a loop that keeps running as long as the pipeline keeps running or the file is not over:

while (!done_ && std::getline(ifs_, line)) {
    // ...
}

In addition to the loop needed to load and parse the CSV file, there are two important function calls: BaseStage::produce() and BaseStage::complete(). A stage calls the BaseStage::produce() function when it has produced data that should be fed to the next stages, passing the data as the function parameter:

// this is how a stage produces data to be consumed by the next stages
produce(cur_cd_buffer_);

Then, the stage calls BaseStage::complete() function, when it has finished producing data, so that the pipeline can stop when the next stages have finished using the data:

// notifies to the pipeline that this producer has no more data to produce
complete();

Connecting stages into a pipeline

Now, moving on to the sample’s main function, where the stages are connected to the pipeline:

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
int main(int argc, char *argv[]) {
    std::string in_csv_file_path;
    int width, height;

    const std::string program_desc(
        "Code sample demonstrating how to use Metavision SDK to display events from a CSV file.\n");

    po::options_description options_desc("Options");
    // clang-format off
    options_desc.add_options()
        ("help,h", "Produce help message.")
        ("input-csv-file,i", po::value<std::string>(&in_csv_file_path)->required(), "Path to input CSV file")
        ("width",            po::value<int>(&width)->default_value(1280), "Width of the sensor associated to the CSV file")
        ("height",           po::value<int>(&height)->default_value(720), "Height of the sensor associated to the CSV file")
        ;
    // clang-format on

    po::variables_map vm;
    try {
        po::store(po::command_line_parser(argc, argv).options(options_desc).run(), vm);
        po::notify(vm);
    } catch (po::error &e) {
        MV_LOG_ERROR() << program_desc;
        MV_LOG_ERROR() << options_desc;
        MV_LOG_ERROR() << "Parsing error:" << e.what();
        return 1;
    }

    if (vm.count("help")) {
        MV_LOG_INFO() << program_desc;
        MV_LOG_INFO() << options_desc;
        return 0;
    }

    /// Pipeline
    //
    //  0 (Csv Reading) -->-- 1 (Frame Generation) -->-- 2 (Display)
    //

    // A pipeline for which all added stages will automatically be run in their own processing threads (if applicable)
    Metavision::Pipeline p(true);

    // 0) Stage producing events from a CSV file
    auto csv_reader = std::make_unique<CSVReadingStage>(in_csv_file_path);
    if (auto width_opt = csv_reader->get_width()) {
        width = *width_opt;
    }
    if (auto height_opt = csv_reader->get_height()) {
        height = *height_opt;
    }
    auto &csv_stage = p.add_stage(std::move(csv_reader));

    // 1) Stage generating a frame with events previously produced using accumulation time of 10ms
    auto &frame_stage = p.add_stage(std::make_unique<Metavision::FrameGenerationStage>(width, height, 10), csv_stage);

    // 2) Stage displaying the generated frame
    auto &disp_stage =
        p.add_stage(std::make_unique<Metavision::FrameDisplayStage>("CD events", width, height), frame_stage);

    // Run the pipeline and wait for its completion
    p.run();

    return 0;
}

First, the Pipeline is instantiated using the Pipeline::Pipeline(bool auto_detach) constructor:

// A pipeline for which all added stages will automatically be run in their own processing threads (if applicable)
Metavision::Pipeline p(true);
The auto_detach argument indicates whether the pipeline should automatically call BaseStage::detach() on detachable stages in the pipeline.
Then, we add our custom stage, as with any other stage, by using template<typename Stage> Stage &Pipeline::add_stage(std::unique_ptr<Stage> &&stage):
// Add a stage producing events from a CSV file
auto &csv_stage = p.add_stage(std::make_unique<CSVReadingStage>(argv[1]));
To add the following stages to the pipeline and set the connections between them, here, we use an overload of the add_stage function, template<typename Stage> Stage &Pipeline::add_stage(std::unique_ptr<Stage> &&stage, BaseStage &prev_stage) which calls BaseStage::set_previous_stage right after adding the stage to the pipeline:
// Add a stage generating a frame with events previously produced
auto &frame_stage = p.add_stage(std::make_unique<FrameGenerationStage>(width, height, true, 30), csv_stage);

// Add a stage displaying the generated frame
auto &disp_stage = p.add_stage(std::make_unique<FrameDisplayStage>("CD events"), frame_stage);

Running the Pipeline

Finally, when the pipeline is set up, it is started by calling Pipeline::run() from the main thread, which will keep running until completion or until the pipeline is cancelled.

p.run();