Template Class pipeline¶
Defined in File pipeline.hpp
Class Documentation¶
-
template<typename
T>
classconcore::v1::pipeline¶ Implements a pipeline, used to add concurrency to processing of items that need several stages of processing with various ordering constraints.
A pipeline is a sequence of stages in a the processing of a items (lines). All items share the same stages. All the stages operate on the same type (line type).
- Template Parameters
T: The type of items that flow through the pipeline.
The pipeline is built with the help of the pipeline_builder class.
Even if the stages need to be run in sequence, we might get concurrency out of this structure as we can execute multiple lines concurrently. However, most of the pipelines also add more constraints on the execution of lines.
The first constraint that one might add is the number of lines that can be processed concurrently. This is done by a parameter passed to the constructor.
The other way to constraint the pipeline is to add various ordering constraints for the stages. We might have multiple ordering constraints for stages:
in_order: at most one item can be processed in the stage, and the items are executed in the order in which they are added to the pipelineout_of_order: at most one item can be processed in the stage, but the order of processing doesn’t matterconcurrent: no constraints are set; the items can run concurrently on the stage
The
in_ordertype adds the more constraints to a stage;out_of_orderis a bit more flexible, but still limits the concurrency a lot. The most relaxed mode isconcurrent.If a pipeline has only
in_orderstages, then the concurrency of the pipeline grows to the number of stages it has; but typically the concurrency is very limited. We gain concurrency if we can make some of the stagesconcurrent. A pipeline scales well with respect to concurrency if most of its processing is happening inconcurrentstages.If a stage processing throws an exception, then, for that particular line, the next stages will not be run. If some of the next stages are
in_orderthen the next items will not be blocked by not executing this stage; i.e., the processing in the stage is just skipped.Example of building a pipeline:
auto my_pipeline = concore::pipeline_builder<int>() | concore::stage_ordering::concurrent | [&](int idx) { work1(idx); } | concore::stage_ordering::out_of_order | [&](int idx) { work2(idx); } | concore::pipeline_end; for ( int i=0; i<100; i++) my_pipeline.push(i);
- See
pipeline_builder, stage_ordering, serializer, n_serializer
Public Functions
-
void
push(T line_data)¶ Pushes a new item (line) through the pipeline.
This will start processing from the first stage and will iteratively pass through all the stages of the pipeline. The same line data is passed to the functors registered with each stage of the pipeline; i.e., all the stages of the pipeline work on the same line.
- Parameters
line_data: The data associated with the line