Program Listing for File pipeline.hpp¶
↰ Return to documentation for file (include/concore/pipeline.hpp)
#pragma once
#include "concore/task_group.hpp"
#include "concore/any_executor.hpp"
#include <memory>
#include <functional>
namespace concore {
inline namespace v1 {
enum class stage_ordering {
in_order,
out_of_order,
concurrent,
};
} // namespace v1
namespace detail {
// Forward declaration; see .cpp file
struct pipeline_data;
struct line_base {
int stage_idx_ : 31;
int stopped_ : 1;
int order_idx_{0};
line_base()
: stage_idx_(0)
, stopped_(0) {}
};
template <typename T>
struct typed_line : line_base {
T data_;
explicit typed_line(T line_data)
: data_(std::move(line_data)) {}
};
using stage_fun = std::function<void(line_base*)>;
template <typename T, typename F>
inline stage_fun create_stage_fun(F&& work) {
return [work = std::forward<F>(work)](detail::line_base* line) {
auto l = static_cast<detail::typed_line<T>*>(line);
work(l->data_);
};
}
struct pipeline_impl {
std::shared_ptr<pipeline_data> data_;
explicit pipeline_impl(int max_concurrency);
pipeline_impl(int max_concurrency, task_group grp);
pipeline_impl(int max_concurrency, task_group grp, any_executor exe);
pipeline_impl(int max_concurrency, any_executor exe);
~pipeline_impl();
pipeline_impl(pipeline_impl&&) noexcept;
pipeline_impl& operator=(pipeline_impl&&) noexcept;
pipeline_impl(const pipeline_impl&);
pipeline_impl& operator=(const pipeline_impl&);
void do_add_stage(stage_ordering ord, stage_fun&& f);
void do_start_line(std::shared_ptr<line_base>&& line);
};
} // namespace detail
inline namespace v1 {
template <typename T>
class pipeline_builder;
template <typename T>
class pipeline {
public:
void push(T line_data) {
impl_.do_start_line(std::make_shared<detail::typed_line<T>>(line_data));
}
private:
detail::pipeline_impl impl_;
friend pipeline_builder<T>;
pipeline(detail::pipeline_impl&& impl)
: impl_(std::move(impl)) {}
};
struct pipeline_end_t {};
constexpr auto pipeline_end = pipeline_end_t{};
template <typename T>
class pipeline_builder {
public:
explicit pipeline_builder(int max_concurrency = 0xffff)
: impl_(max_concurrency) {}
pipeline_builder(int max_concurrency, task_group grp)
: impl_(max_concurrency, std::move(grp)) {}
pipeline_builder(int max_concurrency, task_group grp, any_executor exe)
: impl_(max_concurrency, std::move(grp), std::move(exe)) {}
pipeline_builder(int max_concurrency, any_executor exe)
: impl_(max_concurrency, std::move(exe)) {}
template <typename F>
pipeline_builder& add_stage(stage_ordering ord, F&& work) {
impl_.do_add_stage(ord, detail::create_stage_fun<T>(std::forward<F>(work)));
return *this;
}
operator pipeline<T>() { return pipeline<T>(std::move(impl_)); }
pipeline<T> build() { return pipeline<T>(std::move(impl_)); }
pipeline_builder& operator|(stage_ordering ord) {
next_ordering_ = ord;
return *this;
}
template <typename F>
pipeline_builder& operator|(F&& work) {
impl_.do_add_stage(next_ordering_, detail::create_stage_fun<T>(std::forward<F>(work)));
return *this;
}
pipeline<T> operator|(pipeline_end_t) {
return pipeline<T>(std::move(impl_));
;
}
private:
detail::pipeline_impl impl_;
stage_ordering next_ordering_{stage_ordering::in_order};
};
} // namespace v1
} // namespace concore