Template Class pipeline

Class Documentation

template<typename T>
class concore::v1::pipeline

Implements a pipeline, used to add concurrency to processing of items that need several stages of processing with various ordering constraints.

A pipeline is a sequence of stages in a the processing of a items (lines). All items share the same stages. All the stages operate on the same type (line type).

Template Parameters
  • T: The type of items that flow through the pipeline.

The pipeline is built with the help of the pipeline_builder class.

Even if the stages need to be run in sequence, we might get concurrency out of this structure as we can execute multiple lines concurrently. However, most of the pipelines also add more constraints on the execution of lines.

The first constraint that one might add is the number of lines that can be processed concurrently. This is done by a parameter passed to the constructor.

The other way to constraint the pipeline is to add various ordering constraints for the stages. We might have multiple ordering constraints for stages:

  • in_order: at most one item can be processed in the stage, and the items are executed in the order in which they are added to the pipeline

  • out_of_order: at most one item can be processed in the stage, but the order of processing doesn’t matter

  • concurrent: no constraints are set; the items can run concurrently on the stage

The in_order type adds the more constraints to a stage; out_of_order is a bit more flexible, but still limits the concurrency a lot. The most relaxed mode is concurrent.

If a pipeline has only in_order stages, then the concurrency of the pipeline grows to the number of stages it has; but typically the concurrency is very limited. We gain concurrency if we can make some of the stages concurrent. A pipeline scales well with respect to concurrency if most of its processing is happening in concurrent stages.

If a stage processing throws an exception, then, for that particular line, the next stages will not be run. If some of the next stages are in_order then the next items will not be blocked by not executing this stage; i.e., the processing in the stage is just skipped.

Example of building a pipeline:

auto my_pipeline = concore::pipeline_builder<int>()
    | concore::stage_ordering::concurrent
    | [&](int idx) {
        work1(idx);
    }
    | concore::stage_ordering::out_of_order
    | [&](int idx) {
        work2(idx);
    }
    | concore::pipeline_end;
for ( int i=0; i<100; i++)
    my_pipeline.push(i);

See

pipeline_builder, stage_ordering, serializer, n_serializer

Public Functions

void push(T line_data)

Pushes a new item (line) through the pipeline.

This will start processing from the first stage and will iteratively pass through all the stages of the pipeline. The same line data is passed to the functors registered with each stage of the pipeline; i.e., all the stages of the pipeline work on the same line.

Parameters
  • line_data: The data associated with the line