async_msg は、インテル® TBB のフローグラフと (スレッドやデバイスのような) 外部並列処理の間の相互通信を実装するために使用される特別なメッセージクラスです。
template < typename T > class async_msg;
#define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1 #include "tbb/flow_graph.h"
async_msg または async_msg から派生したユーザー型は、フローグラフのノードにより特別に制御されます。async_msg<T> は、結果の準備ができる前にグラフで処理できる T 型の非同期ジョブ結果の表現です。結果はいつでも async_msg<T>::set( T result ) メソッドを使用した非同期処理により提供することができます。ほかのクラスは、追加データを含む高度な非同期メッセージを作成する async_msg<T> から継承することができます。T 型および非同期メッセージを受け付けるポートとノードは、どちらも非同期メッセージを受け取ることができ、make_edge インターフェイスに接続できますが、制御方法は異なります。ノードが非同期メッセージを受け付けた場合、(例えば、メッセージをボディーに直接渡して) 着信非同期メッセージに直ちに反応します。しかし、ノードが T 結果型を受け付けた場合、async_msg<T>::finalize() 仮想メソッドが呼び出され、ノードは結果が利用可能になるのを待ちます。async_msg<T>::finalize() メソッドは、(例えば、結果が必要であると非同期処理に通知したり、コールバックを設定する) 派生した非同期メッセージ型でオーバーライドすることができます。
async_msg<T>::set( T result ) メソッドは一度だけ呼び出す必要があります。非同期メッセージが非同期処理の結果を受け付けるノードに渡されない場合、結果は設定されません。しかし、非同期メッセージが非同期メッセージの結果を受け付けるノードに渡された場合 (この場合は finalize() メソッドが呼び出されます)、結果は設定されます。
ノードが非同期メッセージを受け付けるのに T 結果型のメッセージがノードに提供された場合、非同期メッセージが自動的に作成され、その結果として T 型の着信メッセージが直ちに設定されます。
非同期メッセージを受け付けるポートまたはノードは、同じ型の非同期メッセージまたは結果型のメッセージのみ受け付けることができます。そのため、async_msg<T> を async_msg<T> の派生型の非同期メッセージを受け付けるノードに渡すことは不可能です。
現在の実装は async_msg<T>::finalize() メソッドを何度も呼び出すことができますが、結果が以前に設定されている場合はメソッドを呼び出しません。しかし、この動作は将来のリリースで変更される予定です。
async_msg の最も簡単な使用シナリオを下記の図に示します。フローグラフの左のノード (n1) はボディで非同期処理を開始し、結果が設定されるのを待つことなく async_msg オブジェクトを返します。async_msg のコピーを介して非同期処理により結果が設定されると、次のノード (n2) は自動的に結果を受け取ります。
最も簡単なシナリオのステップを次に示します。
このシナリオでは、async_msg と function_node はともに async_node クラスに非常に似た動作を提供します。
async_node に対する async_msg の主な利点は、次の使用シナリオのように、未変更または更新されたフローグラフを介してこれらのメッセージを渡せることです。
async_msg<T>::finalize() はライブラリーにより数回呼び出されますが、set() メソッドが async_msg で呼び出された後は呼び出されません。
通常、処理チェーンの最初のノードが外部のデータを非同期処理 (スレッドまたはデバイス) にアップロードします。チェーンの次のノードは、追加データをアップロードおよびダウンロードすることなくデータ処理を続行できます。チェーンの最後のノードのみ、処理の結果を取得します。
async_msg から派生したユーザークラスは、上記のユースケースで使用できます。例えば、データ処理チェーンの動作中に追加の状態やフラグを格納または更新する派生した非同期メッセージクラスを実装できます。
次のサンプルで source_node は、ユーザースレッドによって処理されるように、ワークを AsyncActivity に渡します。async_msg は、ダウンストリームの function_node に結果を伝えるために使用されます。
#define TBB_PREVIEW_FLOW_GRAPH_FEATURES 1 #include <thread> #include <atomic> #include "tbb/flow_graph.h" #include "tbb/concurrent_queue.h" using namespace tbb::flow; typedef int input_type; typedef int output_type; typedef tbb::flow::async_msg<output_type> async_msg_type; class AsyncActivity { public: struct work_type { input_type input; async_msg_type msg; }; AsyncActivity(tbb::flow::graph& g) : my_graph(g), my_the_end(false), service_thread( [this]() { work_type w; while( ! my_the_end ) { while( my_work_queue.try_pop(w) ) { output_type result = do_work( w.input ); // 結果をグラフに返す w.msg.set(result); // ワーク完了を通知する (グラフをアンブロック) my_graph.decrement_wait_count(); } } } ) {} ~AsyncActivity() { my_the_end = true; // スレッドを終了すべきであることを示す service_thread.join(); } void submit( input_type i, const async_msg_type& msg ) { work_type w = {i, msg}; my_graph.increment_wait_count(); my_work_queue.push(w); } private: output_type do_work(input_type& v) { // 入力を処理して出力に変換する } tbb::flow::graph& my_graph; tbb::concurrent_queue<work_type> my_work_queue; std::atomic<bool> my_the_end; std::thread service_thread; }; int main() { tbb::flow::graph g; AsyncActivity async_activity(g); tbb::flow::source_node<async_msg_type> s(g, [&](async_msg_type& v)->bool { /* 非同期処理用のデータを生成する */ if ( /* ソースが終了していない */ ) { async_msg_type msg; /* ここで "入力" を生成する */ async_activity.submit(input, msg); v = msg; return true; } return false; }); tbb::flow::function_node<output_type> f( g, unlimited, [](const output_type& v) { /* 非同期処理からデータを取得する */ }); tbb::flow::make_edge( s, f ); g.wait_for_all(); return 0; }
namespace tbb { namespace flow { template <typename T> class async_msg { public: typedef T async_msg_data_type; async_msg(); virtual ~async_msg() {} void set(const T& t); void set(T&& t); protected: // 派生クラスでオーバーライドして非同期計算チェーンが // 終了したことを通知する virtual void finalize() const {} }; } }
メンバー | 説明 |
---|---|
typedef T async_msg_data_type | ユーザーデータ型の定義。 |
async_msg() | デフォルト・コンストラクター。 |
async_msg(const async_msg&) | 自動生成コピー・コンストラクター。 |
~async_msg() | デストラクター。 |
void set(const T& t) | 非同期処理からフローグラフに結果を返すメソッドを呼び出します。 注set() は async_msg ごとに一度だけ呼び出します。 |
void set(T&& t) | 前の set(const T&) メソッドと同じですが、C++ 'move' セマンティクスを使用します。 |
async_msg& operator = (const async_msg&) | 自動生成代入演算子。 |
PROTECTED | |
---|---|
virtual void finalize() const | 派生クラスでメソッドをオーバーライドして、データ制御チェーンが終了したことを非同期処理に通知します。非同期処理は (set() 呼び出しで) 結果を返します。 注非同期処理は set() メソッドをいつでも呼び出すことができますが、finalize() 同期呼び出しはフローグラフの結果を待っているノードが少なくとも 1 つある場合は待つ必要があります。 デフォルトの実装は空です。 |
async_msg クラスはコピー可能であるため、ユーザーはすべての派生クラスで正しいコピールーチンを提供する必要があります。