graph_node、receiver<Input>、sender<Output> のテンプレート・クラス。受け取ったメッセージのユーザー定義ボディーを実行します。ボディーは、インテル® TBB スレッドプール外で処理するように、入力メッセージを外部処理に渡します。このノードは、外部処理とフローグラフの通信を可能にする async_gateway_type インターフェイスも提供します。
template < typename Input, typename Output, typename Allocator = cache_aligned_allocator<Input> > class async_node;
#define TBB_PREVIEW_FLOW_GRAPH_NODES 1 #include "tbb/flow_graph.h"
async_node を利用することで、フローグラフとユーザーまたは別のランタイムによって管理される外部処理の通信が可能です。このノードは Input 型のメッセージを受け取り、メッセージを外部処理に渡すためボディーを呼び出します。結果のリターンと async_node の出力ポートへの push には、async_gateway_type を利用します。
async_node は、同時に実行するボディーを無制限に呼び出すことができます。
Input と Output は、コピー構築可能および代入可能でなければなりません。
async_node のボディーコンセプトを次に示します。
擬似署名 |
意味 |
---|---|
B::B( const B& ) |
コピー・コンストラクター。 |
B::~B() |
デストラクター。 |
void operator=( const B& ) |
代入。 |
void B::operator()(const input_type &v, async_gateway_type &gateway ) |
入力値 v は外部処理に渡されます。gateway インターフェイスにより、外部処理とエンクロージング・フローグラフの通信が可能です。 |
async_node に渡されるボディー・オブジェクトはコピーされます。そのため、メンバー変数を更新してもノードの構築に使用されたオリジナルのオブジェクトには影響しません。ノードの外部からボディー・オブジェクト内に保持されている状態を確認する場合は、copy_body 関数を使用して更新されたコピーを取得します。
ボディー・オブジェクトは、そのエンクロージング・グラフをスローまたはキャンセルすることがあります。詳細は、「task_group_context」および「例外」を参照してください。
次に async_gateway_type インターフェイスを示します。
擬似署名 |
意味 |
---|---|
bool async_try_put( const output_type &v ) |
v を async_node のすべての後継にブロードキャストします。 |
void async_reserve() |
作業が外部処理に渡されたことをフローグラフに通知します。 |
void async_commit() |
外部処理に渡された作業が完了したことをフローグラフに通知します。 |
外部処理に渡される入力メッセージごとに async_gateway_type::async_reserve() メソッドを呼び出す必要はありません。各 async_gateway_type::async_reserve() 呼び出しには、対応する async_gateway_type::async_commit() 呼び出しが必要です。
graph::wait_for_all() は、対応する async_commit() のない async_reserve() が存在する間は終了しません。
次のサンプルで async_node は、ユーザースレッドによって処理されるように、AsyncActivity に作業を渡します。
#include "tbb/flow_graph.h" #include "tbb/concurrent_queue.h" #include <thread> using namespace tbb::flow; typedef int input_type; typedef int output_type; typedef tbb::flow::async_node<input_type, output_type=""> async_node_type; class AsyncActivity { public: typedef async_node_type::async_gateway_type async_gateway_type; struct work_type { input_type input; async_gateway_type* gateway; }; AsyncActivity() : service_thread( [this]() { while( !end_of_work() ) { work_type w; while( my_work_queue.try_pop(w) ) { output_type result = do_work( w.input ); // 結果をグラフに返す w.gateway->async_try_put( result ); // 作業完了を通知する w.gateway->async_commit(); } } } ) {} void submit( input_type i, async_gateway_type* gateway ) { work_type w = {i, gateway}; gateway->async_reserve(); my_work_queue.push(w); } private: bool end_of_work() { // スレッドを終了すべきであることを示す } output_type do_work(input_type& v) { // 入力を処理し、出力に変換する } tbb::concurrent_queue<work_type> my_work_queue; std::thread service_thread; }; int main() { AsyncActivity async_activity; tbb::flow::graph g; async_node_type async_consumer( g, // 非同期処理を初期化するユーザー・ファンクター [&] (input_type input, async_node_type::async_gateway_type& gateway ) { async_activity.submit( input, &gateway ); } ); tbb::flow::source_node<input_type> s( g, [](input_type& v)->bool { /* 非同期処理用のデータを生成する */ } ); tbb::flow::function_node<output_type> f( g, unlimited, [](const output_type& v) { /* 非同期処理からからデータを取得する */ } ); tbb::flow::make_edge( s, async_consumer ); tbb::flow::make_edge( async_consumer, f ); g.wait_for_all(); }
namespace tbb { namespace flow { template < typename Input, typename Output, typename Allocator = cache_aligned_allocator<Input> > class async_node : public graph_node, public receiver<Input>, public sender<Output> { public: template <typename Body> async_node( graph& g, Body body ); async_node( const async_node& src ); typedef implementation-dependent async_gateway_type; async_gateway_type& async_gateway(); // receiver<Input> typedef Input input_type; typedef sender<input_type> predecessor_type; bool try_put( const input_type& v ); bool register_predecessor( predecessor_type& p ); bool remove_predecessor( predecessor_type& p ); // sender<Output> typedef Output output_type; typedef receiver<output_type> successor_type; bool register_successor( successor_type& r ); bool remove_successor( successor_type& r ); bool try_get( output_type& v ); bool try_reserve( output_type& v ); bool try_release(); bool try_consume(); }; } }
メンバー | 説明 |
---|---|
template<typename Body> async_node(graph& g, Body body) |
body のコピーを呼び出す async_node を構築します。 |
async_node( const async_node& src ) |
src の構築時の状態と同じ初期状態で async_node を構築します。構築される async_node には、src と同じ graph オブジェクトへの参照と src で使用される初期 body のコピーが含まれます。src のプレデセッサーおよびサクセサーはコピーされません。 警告新しいボディー・オブジェクトは、src の構築時に提供されたオリジナルのボディーのコピーからコピー構築されます。このため、src の構築後に src のボディーのメンバー変数に対して行われた変更は、新しい function_node のボディーに影響しません。 |
async_gateway_type& async_gateway() |
async_gateway_type インターフェイスへの参照を返します。 |
bool try_put( const input_type& v ) |
body(v) を実行するタスクがスポーンされます。 戻り値: 常に true。v を 外部処理に渡すのは body の責任です。body によってメッセージが適切に処理されなかった場合、そのメッセージは失われます。 |
bool try_get( output_type& v ) |
async_node に出力のバッファーは含まれていないため、try_get 呼び出しは常に拒否されます。 戻り値: false。 |
bool try_reserve( output_type& v ) |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |
bool try_release( ) |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |
bool try_consume( ) |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |