Program Listing for File pipeline.hpp

Return to documentation for file (include/concore/pipeline.hpp)

#pragma once

#include "concore/task_group.hpp"
#include "concore/any_executor.hpp"

#include <memory>
#include <functional>

namespace concore {

inline namespace v1 {

enum class stage_ordering {
    in_order,
    out_of_order,
    concurrent,
};

} // namespace v1

namespace detail {
// Forward declaration; see .cpp file
struct pipeline_data;

struct line_base {
    int stage_idx_ : 31;
    int stopped_ : 1;
    int order_idx_{0};

    line_base()
        : stage_idx_(0)
        , stopped_(0) {}
};

template <typename T>
struct typed_line : line_base {
    T data_;

    explicit typed_line(T line_data)
        : data_(std::move(line_data)) {}
};

using stage_fun = std::function<void(line_base*)>;

template <typename T, typename F>
inline stage_fun create_stage_fun(F&& work) {
    return [work = std::forward<F>(work)](detail::line_base* line) {
        auto l = static_cast<detail::typed_line<T>*>(line);
        work(l->data_);
    };
}

struct pipeline_impl {
    std::shared_ptr<pipeline_data> data_;

    explicit pipeline_impl(int max_concurrency);
    pipeline_impl(int max_concurrency, task_group grp);
    pipeline_impl(int max_concurrency, task_group grp, any_executor exe);
    pipeline_impl(int max_concurrency, any_executor exe);
    ~pipeline_impl();

    pipeline_impl(pipeline_impl&&) noexcept;
    pipeline_impl& operator=(pipeline_impl&&) noexcept;

    pipeline_impl(const pipeline_impl&);
    pipeline_impl& operator=(const pipeline_impl&);

    void do_add_stage(stage_ordering ord, stage_fun&& f);

    void do_start_line(std::shared_ptr<line_base>&& line);
};

} // namespace detail

inline namespace v1 {

template <typename T>
class pipeline_builder;

template <typename T>
class pipeline {
public:
    void push(T line_data) {
        impl_.do_start_line(std::make_shared<detail::typed_line<T>>(line_data));
    }

private:
    detail::pipeline_impl impl_;

    friend pipeline_builder<T>;

    pipeline(detail::pipeline_impl&& impl)
        : impl_(std::move(impl)) {}
};

struct pipeline_end_t {};

constexpr auto pipeline_end = pipeline_end_t{};

template <typename T>
class pipeline_builder {
public:
    explicit pipeline_builder(int max_concurrency = 0xffff)
        : impl_(max_concurrency) {}
    pipeline_builder(int max_concurrency, task_group grp)
        : impl_(max_concurrency, std::move(grp)) {}
    pipeline_builder(int max_concurrency, task_group grp, any_executor exe)
        : impl_(max_concurrency, std::move(grp), std::move(exe)) {}
    pipeline_builder(int max_concurrency, any_executor exe)
        : impl_(max_concurrency, std::move(exe)) {}

    template <typename F>
    pipeline_builder& add_stage(stage_ordering ord, F&& work) {
        impl_.do_add_stage(ord, detail::create_stage_fun<T>(std::forward<F>(work)));
        return *this;
    }

    operator pipeline<T>() { return pipeline<T>(std::move(impl_)); }

    pipeline<T> build() { return pipeline<T>(std::move(impl_)); }

    pipeline_builder& operator|(stage_ordering ord) {
        next_ordering_ = ord;
        return *this;
    }

    template <typename F>
    pipeline_builder& operator|(F&& work) {
        impl_.do_add_stage(next_ordering_, detail::create_stage_fun<T>(std::forward<F>(work)));
        return *this;
    }

    pipeline<T> operator|(pipeline_end_t) {
        return pipeline<T>(std::move(impl_));
        ;
    }

private:
    detail::pipeline_impl impl_;
    stage_ordering next_ordering_{stage_ordering::in_order};
};

} // namespace v1
} // namespace concore