Example of Building a Pipeline with a Custom Stage
The code sample Metavision CSV Viewer found in the Core module will be used here to show how to build a pipeline with a custom stage.
The goal of this pipeline is to read data from a CSV file (for example generated using the File to CSV sample) and visualize the data on screen.
The pipeline can be represented by this graph:
Implementing a Custom Processing Stage
This sample defines its own processing stage, CSVReadingStage
, that reads events from CSV file:
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 | class CSVReadingStage : public Metavision::BaseStage { public: CSVReadingStage(const std::string &filename) : ifs_(filename) { if (!ifs_.is_open()) { MV_LOG_ERROR() << "Unable to open " << filename; throw std::runtime_error("Unable to open " + filename); } if (!parse_csv_header()) { MV_LOG_ERROR() << "Error while parsing header of " << filename; throw std::runtime_error("Error while parsing header of " + filename); } cur_cd_buffer_ = cd_buffer_pool_.acquire(); cur_cd_buffer_->clear(); // this callback is called once the pipeline is started, so the stage knows it can start producing set_starting_callback([this]() { done_ = false; reading_thread_ = std::thread([this] { read(); }); }); // this callback is called once the pipeline is stopped : it can be initiated by a call // to @ref Pipeline::cancel() and/or after all stages are done and all task queues have been cleared set_stopping_callback([this]() { done_ = true; if (reading_thread_.joinable()) reading_thread_.join(); }); } /// [PIPELINE_USAGE_READ_BEGIN] void read() { std::string line; while (!done_ && std::getline(ifs_, line)) { // once here, we know that the stage has not been stopped yet, // so we read a line and may produce a buffer std::istringstream iss(line); std::vector<std::string> tokens; std::string token; while (std::getline(iss, token, ',')) { tokens.push_back(token); } if (tokens.size() != 4) { MV_LOG_ERROR() << "Invalid line : <" << line << "> ignored"; } else { if (cur_cd_buffer_->size() > 5000) { // this is how a stage produces data to be consumed by the next stages produce(cur_cd_buffer_); cur_cd_buffer_ = cd_buffer_pool_.acquire(); cur_cd_buffer_->clear(); } cur_cd_buffer_->emplace_back(static_cast<unsigned short>(std::stoul(tokens[0])), static_cast<unsigned short>(std::stoul(tokens[1])), static_cast<short>(std::stoi(tokens[2])), std::stoll(tokens[3])); } } // notifies to the pipeline that this producer has no more data to produce complete(); } /// [PIPELINE_USAGE_READ_END] std::optional<int> get_width() const { return width_; } std::optional<int> get_height() const { return height_; } private: bool read_cd_csv_header_line() { std::string line; if (std::getline(ifs_, line)) { std::istringstream iss(line); std::string key, value; std::vector<std::string> values; iss.ignore(1); // ignore leading '%' std::getline(iss, key, ':'); while (std::getline(iss, value, ',')) { values.push_back(value); } if (key == "geometry") { if (values.size() == 2) { width_ = std::stoi(values[0]); height_ = std::stoi(values[1]); } else { MV_LOG_ERROR() << "Ignoring invalid header line for key geometry, expected " "\"%geometry:<width>,<height>\", got: \"" << line << "\""; } } } return ifs_.good(); } bool parse_csv_header() { while (ifs_.peek() == '%') { if (!read_cd_csv_header_line()) { return false; } } return true; } std::optional<int> width_, height_; std::atomic<bool> done_; std::thread reading_thread_; std::ifstream ifs_; EventBufferPool cd_buffer_pool_; EventBufferPtr cur_cd_buffer_; }; |
Let’s analyze the code.
First, let’s create a class that derives from BaseStage
. This class will be the
stage in charge of reading events from the CSV file and producing the events for the rest of the pipeline.
Here’s the beginning of the constructor:
36 | public: |
The constructor for CSVReadingStage
calls the default constructor for BaseStage
,
BaseStage::BaseStage(bool detachable = true)
.
detachable
is set to true
means that BaseStage::detach()
can be called on this stage, which will make the pipeline
use a dedicated thread for this stage. The pipeline will use this thread for calling the stage’s callbacks.
If, on the other hand, you want your stage to run in the main thread, you can set this value to false, to declare that
it is not detachable.
This is useful for some processing, such as interfacing with some GUIs.
In the next lines the constructor makes sure the file is valid and open:
37 38 39 | CSVReadingStage(const std::string &filename) : ifs_(filename) { if (!ifs_.is_open()) { MV_LOG_ERROR() << "Unable to open " << filename; |
Next, it creates a buffer used to store the events. In particular, it acquires the buffer from an object pool,
BoundedSharedObjectPool
. For an explanation of the object pool
pattern, refer to the Wikipedia page.
In particular, our implementation returns smart pointers which return the resource to the pool when the pointer
goes out of scope.
This buffer will contain the read events from the CSV file.
41 42 | } if (!parse_csv_header()) { |
Note that there are four possible types of callbacks that a stage can have:
BaseStage::set_consuming_callback()
used when one of the previous stages has produced data.BaseStage::set_receiving_callback()
used for receiving generic notifications, for example, when the pipeline is cancelled withPipeline::cancel()
.BaseStage::set_starting_callback()
used when the pipeline starts running.BaseStage::set_stopping_callback()
used when the pipeline stops running.
In our stage, we only need the starting and stopping callbacks which are used for managing a
thread used by our CSVReadingStage
for reading the input file:
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | throw std::runtime_error("Error while parsing header of " + filename); } cur_cd_buffer_ = cd_buffer_pool_.acquire(); cur_cd_buffer_->clear(); // this callback is called once the pipeline is started, so the stage knows it can start producing set_starting_callback([this]() { done_ = false; reading_thread_ = std::thread([this] { read(); }); }); // this callback is called once the pipeline is stopped : it can be initiated by a call // to @ref Pipeline::cancel() and/or after all stages are done and all task queues have been cleared set_stopping_callback([this]() { |
Next, CSVReadingStage::read()
is defined. This function is called from the reading_thread_
thread,
in order to read the content of the file and “produce” data to be fed to the next stages.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | void read() { std::string line; while (!done_ && std::getline(ifs_, line)) { // once here, we know that the stage has not been stopped yet, // so we read a line and may produce a buffer std::istringstream iss(line); std::vector<std::string> tokens; std::string token; while (std::getline(iss, token, ',')) { tokens.push_back(token); } if (tokens.size() != 4) { MV_LOG_ERROR() << "Invalid line : <" << line << "> ignored"; } else { if (cur_cd_buffer_->size() > 5000) { // this is how a stage produces data to be consumed by the next stages produce(cur_cd_buffer_); cur_cd_buffer_ = cd_buffer_pool_.acquire(); cur_cd_buffer_->clear(); } cur_cd_buffer_->emplace_back(static_cast<unsigned short>(std::stoul(tokens[0])), static_cast<unsigned short>(std::stoul(tokens[1])), static_cast<short>(std::stoi(tokens[2])), std::stoll(tokens[3])); } } // notifies to the pipeline that this producer has no more data to produce complete(); } |
As you can see, this function consists of a loop that keeps running as long as the pipeline keeps running or the file is not over:
while (!done_ && std::getline(ifs_, line)) {
// ...
}
In addition to the loop needed to load and parse the CSV file, there are two important function calls:
BaseStage::produce()
and BaseStage::complete()
.
A stage calls the BaseStage::produce()
function
when it has produced data that should be fed to the next stages, passing the data as the function parameter:
// this is how a stage produces data to be consumed by the next stages
produce(cur_cd_buffer_);
Then, the stage calls BaseStage::complete()
function,
when it has finished producing data, so that the pipeline can stop when the next stages have finished using the data:
// notifies to the pipeline that this producer has no more data to produce
complete();
Connecting stages into a pipeline
Now, moving on to the sample’s main function, where the stages are connected to the pipeline:
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | int main(int argc, char *argv[]) { std::string in_csv_file_path; int width, height; const std::string program_desc( "Code sample demonstrating how to use Metavision SDK to display events from a CSV file.\n"); po::options_description options_desc("Options"); // clang-format off options_desc.add_options() ("help,h", "Produce help message.") ("input-csv-file,i", po::value<std::string>(&in_csv_file_path)->required(), "Path to input CSV file") ("width", po::value<int>(&width)->default_value(1280), "Width of the sensor associated to the CSV file") ("height", po::value<int>(&height)->default_value(720), "Height of the sensor associated to the CSV file") ; // clang-format on po::variables_map vm; try { po::store(po::command_line_parser(argc, argv).options(options_desc).run(), vm); po::notify(vm); } catch (po::error &e) { MV_LOG_ERROR() << program_desc; MV_LOG_ERROR() << options_desc; MV_LOG_ERROR() << "Parsing error:" << e.what(); return 1; } if (vm.count("help")) { MV_LOG_INFO() << program_desc; MV_LOG_INFO() << options_desc; return 0; } /// Pipeline // // 0 (Csv Reading) -->-- 1 (Frame Generation) -->-- 2 (Display) // // A pipeline for which all added stages will automatically be run in their own processing threads (if applicable) Metavision::Pipeline p(true); // 0) Stage producing events from a CSV file auto csv_reader = std::make_unique<CSVReadingStage>(in_csv_file_path); if (auto width_opt = csv_reader->get_width()) { width = *width_opt; } if (auto height_opt = csv_reader->get_height()) { height = *height_opt; } auto &csv_stage = p.add_stage(std::move(csv_reader)); // 1) Stage generating a frame with events previously produced using accumulation time of 10ms auto &frame_stage = p.add_stage(std::make_unique<Metavision::FrameGenerationStage>(width, height, 10), csv_stage); // 2) Stage displaying the generated frame auto &disp_stage = p.add_stage(std::make_unique<Metavision::FrameDisplayStage>("CD events", width, height), frame_stage); // Run the pipeline and wait for its completion p.run(); return 0; } |
First, the Pipeline
is instantiated
using the Pipeline::Pipeline(bool auto_detach)
constructor:
// A pipeline for which all added stages will automatically be run in their own processing threads (if applicable)
Metavision::Pipeline p(true);
auto_detach
argument indicates whether the pipeline should automatically call
BaseStage::detach()
on detachable stages in the pipeline.template<typename Stage> Stage &Pipeline::add_stage(std::unique_ptr<Stage> &&stage)
:// Add a stage producing events from a CSV file
auto &csv_stage = p.add_stage(std::make_unique<CSVReadingStage>(argv[1]));
add_stage
function,
template<typename Stage> Stage &Pipeline::add_stage(std::unique_ptr<Stage> &&stage, BaseStage &prev_stage)
which calls BaseStage::set_previous_stage
right after adding the stage to the pipeline:// Add a stage generating a frame with events previously produced
auto &frame_stage = p.add_stage(std::make_unique<FrameGenerationStage>(width, height, true, 30), csv_stage);
// Add a stage displaying the generated frame
auto &disp_stage = p.add_stage(std::make_unique<FrameDisplayStage>("CD events"), frame_stage);
Running the Pipeline
Finally, when the pipeline is set up, it is started by calling
Pipeline::run()
from the main thread,
which will keep running until completion or until the pipeline is cancelled.
p.run();