graph_node、receiver<Input>、sender<Output> のテンプレート・クラス。着信メッセージのユーザー定義ボディーを実行します。ボディーは、インテル® TBB スレッドプール外で処理するように、入力メッセージを外部処理に渡します。このノードは、外部処理とフローグラフの通信を可能にする gateway_type インターフェイスも提供します。
template < typename Input,
typename Output,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
class async_node;
#include "tbb/flow_graph.h"
async_node を利用することで、フローグラフとユーザーまたは別のランタイムによって管理される外部処理の通信が可能です。このノードは Input 型のメッセージを受け取り、メッセージを外部処理に渡すためボディーを呼び出します。結果のリターンと async_node の出力ポートへの push には、gateway_type を利用します。
async_node には、ユーザーが設定可能な同時実行数の制限が適用されます。flow::unlimited を指定すると、ボディーの無制限の呼び出しを同時に実行できます。flow::serial を指定すると、ボディーの 1 つの呼び出しのみ同時に実行できます。同時実行数を制限する size_t 型の値を 1 から unlimited の間で指定することもできます。
async_node のボディーコンセプトを次に示します。
|
擬似署名 |
意味 |
|---|---|
|
B::B( const B& ) |
コピー・コンストラクター。 |
|
B::~B() |
デストラクター。 |
|
void operator=( const B& ) |
代入。 |
|
void B::operator()( const input_type &v, gateway_type &gateway ) |
入力値 v は外部処理に渡されます。gateway インターフェイスにより、外部処理とエンクロージング・フローグラフの通信が可能です。 |
async_node に渡されるボディー・オブジェクトはコピーされます。そのため、メンバー変数を更新してもノードの構築に使用されたオリジナルのオブジェクトには影響しません。ノードの外部からボディー・オブジェクト内に保持されている状態を確認する場合は、copy_body 関数を使用して更新されたコピーを取得します。
ボディー・オブジェクトは、そのエンクロージング・グラフをスローまたはキャンセルすることがあります。詳細は、「task_group_context」および「例外」を参照してください。
次に gateway_type インターフェイスを示します。
|
擬似署名 |
意味 |
|---|---|
|
bool try_put( const output_type &v ) |
v を async_node のすべてのサクセサーにブロードキャストします。 |
|
void reserve_wait() |
ワークが外部処理に渡されたことをフローグラフに通知します。 |
|
void release_wait() |
外部処理に渡されたワークが完了したことをフローグラフに通知します。 |
外部処理に渡される入力メッセージごとに gateway_type::reserve_wait() メソッドを呼び出す必要はありません。各 gateway_type::reserve_wait() 呼び出しには、対応する gateway_type::release_wait() 呼び出しが必要です。
graph::wait_for_all() は、対応する release_wait() のない reserve_wait() が存在する間は終了しません。
次のサンプルで async_node は、ユーザースレッドによって処理されるように、AsyncActivity にワークを渡します。
#include "tbb/flow_graph.h"
#include "tbb/concurrent_queue.h"
#include <thread>
using namespace tbb::flow;
typedef int input_type;
typedef int output_type;
typedef tbb::flow::async_node<input_type, output_type> async_node_type;
class AsyncActivity {
public:
typedef async_node_type::gateway_type gateway_type;
struct work_type {
input_type input;
gateway_type* gateway;
};
AsyncActivity() : service_thread( [this]() {
while( !end_of_work() ) {
work_type w;
while( my_work_queue.try_pop(w) ) {
output_type result = do_work( w.input );
// 結果をグラフに返す
w.gateway->try_put( result );
// ワーク完了を通知する
w.gateway->release_wait();
}
}
} ) {}
void submit( input_type i, gateway_type* gateway ) {
work_type w = {i, gateway};
gateway->reserve_wait();
my_work_queue.push(w);
}
private:
bool end_of_work() {
// スレッドを終了すべきであることを示す
}
output_type do_work( input_type& v ) {
// 入力を処理し、出力に変換する
}
tbb::concurrent_queue<work_type> my_work_queue;
std::thread service_thread;
};
int main() {
AsyncActivity async_activity;
tbb::flow::graph g;
async_node_type async_consumer( g, unlimited,
// 非同期処理を初期化するユーザー・ファンクター
[&] ( input_type input, async_node_type::gateway_type& gateway ) {
async_activity.submit( input, &gateway );
} );
tbb::flow::source_node<input_type> s( g, [](input_type& v)->bool { /* 非同期処理用のデータを生成する */ } );
tbb::flow::async_node<output_type> f( g, unlimited, [](const output_type& v) { /* 非同期処理からデータを取得する */ } );
tbb::flow::make_edge( s, async_consumer );
tbb::flow::make_edge( async_consumer, f );
g.wait_for_all();
}
namespace tbb {
namespace flow {
template < typename Input, typename Output,
typename Allocator = cache_aligned_allocator<Input> >
class async_node : public graph_node, public receiver<Input>, public sender<Output> {
public:
template <typename Body>
async_node( graph& g, size_t concurrency, Body body );
async_node( const async_node& src );
typedef implementation-defined gateway_type;
gateway_type& gateway();
// receiver<Input>
typedef Input input_type;
typedef sender<input_type> predecessor_type;
bool try_put( const input_type& v );
bool register_predecessor( predecessor_type& p );
bool remove_predecessor( predecessor_type& p );
// sender<Output>
typedef Output output_type;
typedef receiver<output_type> successor_type;
bool register_successor( successor_type& r );
bool remove_successor( successor_type& r );
bool try_get( output_type& v );
bool try_reserve( output_type& v );
bool try_release();
bool try_consume();
};
}
}
| メンバー | 説明 |
|---|---|
| template<typename Body> async_node(graph& g, size_t concurrency, Body body) |
body のコピーを呼び出す async_node を構築します。ほとんどの場合、body に対する concurrency の呼び出しは同時に行われます。 |
| async_node( const async_node& src ) |
src の構築時の状態と同じ初期状態で async_node を構築します。構築される async_node には、src と同じ グラフ・オブジェクトへの参照、src で使用される初期 body のコピー、src と同じ同時実行数しきい値が含まれます。src のプレデセッサーおよびサクセサーはコピーされません。 注意新しいボディー・オブジェクトは、src の構築時に提供されたオリジナルのボディーのコピーからコピー構築されます。このため、src の構築後に src のボディーのメンバー変数に対して行われた変更は、新しい async_node のボディーに影響しません。 |
| gateway_type& gateway() |
gateway_type インターフェイスへの参照を返します。 |
| bool try_put( const input_type& v ) |
body(v) を実行するタスクがスポーンされます。 戻り値: 常に true。v を 外部処理に渡すのは body の責任です。body によってメッセージが適切に処理されなかった場合、そのメッセージは失われます。 |
| bool try_get( output_type& v ) |
async_node に出力のバッファーは含まれていないため、try_get 呼び出しは常に拒否されます。 戻り値: false。 |
| bool try_reserve( output_type& v ) |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |
| bool try_release() |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |
| bool try_consume() |
async_node に出力のバッファーは含まれていないため、予約することはできません。 戻り値: false。 |