入力ポートで受け取ったメッセージのセットから tuple<T0,T1, ... > を作成して、タプルをすべてのサクセサーにブロードキャストするノード。join_node クラスは、入力ポートで 3 つのバッファーポリシー (reserving、queueing、key_matching) をサポートします。 デフォルトでは、join_node の入力ポートは queueing ポリシーを使用します。
struct queueing; struct reserving; template<typename K, typename KHash=tbb_hash_compare<K> > struct key_matching; typedef key_matching<tag_value> tag_matching; template<typename OutputTuple, class JP = queueing> class join_node;
#include "tbb/flow_graph.h"
join_node は、graph_node および sender< flow::tuple< T0, T1, ... > > です。OutputTuple の T0 .. TN に対応する receiver<Ti> である入力ポートのタプルが含まれます。異なる型の複数の入力レシーバーをサポートしており、受け取ったメッセージのタプルをすべてのサクセサーにブロードキャストします。join_node の入力ポートはすべて、同じバッファーポリシーを使用しなければなりません。バッファーポリシーに基づく join_node の動作は、下記の表に示されています。
|
バッファーポリシー |
動作 |
|---|---|
|
queueing |
各入力ポートで、着信メッセージがポートの無制限の FIFO (先入れ先出し) キューに追加されます。各入力ポートに少なくとも 1 つのメッセージがある場合、join_node は各キューの先頭を含むタプルをすべてのサクセサーにブロードキャストします。少なくとも 1 つのサクセサーがタプルを受け付けた場合、各入力ポートのキューの先頭は削除されます。その他の場合、メッセージは各入力ポートのキューに残されます。 |
|
reserving |
各入力ポートで、join_node は入力が利用可能であるとマークして false を返します。すべてのポートが利用可能であるとマークされている場合、join_node は既知のプレデセッサーから各ポートのメッセージを予約しようとします。あるポートでメッセージを予約できない場合は、そのポートのマークを解除して、以前取得した予約をすべて解除します。すべてのポートでメッセージを予約できる場合は、これらのメッセージを含むタプルをすべてのサクセサーにブロードキャストします。少なくとも 1 つのサクセサーがタプルを受け付けた場合、予約は消費されます。その他の場合、予約は解除されます。 |
|
key_matching<typename K, class KHash=tbb_hash_compare<K> > |
各入力ポートで、ユーザー定義関数オブジェクトがメッセージに適用され、キーが取得されます。次に、メッセージが各入力ポートのハッシュテーブルに追加されます。指定されたキーの各入力ポートにメッセージがある場合、join_node はすべての一致するメッセージを入力ポートから削除し、一致するメッセージを含むタプルを構築して、すべてのサクセサーにブロードキャストします。タプルを受け付けるサクセサーがない場合、タプルは保存され、後続の try_get に転送されます。 K が参照型 (int& など) の場合、コンストラクターに提供される関数オブジェクトは、タプルの各型 Ti に対して次の型シグネチャーを持ちます。
const K' &(const Ti&)
(ここで K == K'&) K が参照型でない場合、コンストラクターに提供される関数オブジェクトは、タプルの各型 Ti に対して次の型シグネチャーを持ちます。
K (const Ti&)
|
|
tag_matching |
key_matching の特殊化。型 tag_value のキーを受け付けます。それ以外は、key_matching の動作と同じです。 |
join_node のサクセサーがメッセージを拒否した場合、および入力ポートのプレデセッサーからのメッセージの取得に失敗した場合、メッセージ・パッシング・プロトコルを使用して処理されます。
input_port テンプレート関数は、特定の入力ポートへの参照を取得するための構文を単純化します。
OutputTuple は、各要素がコピー構築および代入可能な flow::tuple<T0,T1, ... > でなければなりません。
#include<cstdio>
#include "tbb/flow_graph.h"
using namespace tbb::flow;
int main() {
graph g;
function_node<int,int>
f1( g, unlimited, [](const int &i) { return 2*i; } );
function_node<float,float>
f2( g, unlimited, [](const float &f) { return f/2; } );
join_node< tbb::flow::tuple<int,float> > j(g);
function_node< flow::tuple<int,float> >
f3( g, unlimited,
[]( const flow::tuple<int,float> &t ) {
printf( "Result is %f\n",
std::get<0>(t) + std::get<1>(t));
} );
make_edge( f1, input_port<0>( j ) );
make_edge( f2, input_port<1>( j ) );
make_edge( j, f3 );
f1.try_put( 3 );
f2.try_put( 3 );
g.wait_for_all( );
return 0;
}
上記のサンプルでは、3 つの function_node オブジェクトが作成されます。f1 は int i に 2 を掛けて、f2 は float f を 2 で割って、f3 は flow::tuple<int,float> t を受け取り、要素を互いに追加して結果を出力します。join_node j は、f1 と f2 の出力を組み合わせて、生成されたタプルを f3 に送ります。このサンプルは構文を示すことを目的としているため、ノードでほとんど作業を行っていません。
namespace tbb {
namespace flow {
struct reserving;
struct queueing;
template<typename K, class KHash=tbb_hash_compare<K> >
struct key_matching;
typedef key_matching<tag_value> tag_matching;
template<typename OutputTuple, class JP = queueing>
class join_node :
public graph_node, public sender< OutputTuple > {
public:
typedef OutputTuple output_type;
typedef receiver<output_type> successor_type;
typedef implementation-dependent-tuple input_ports_type;
join_node( graph &g );
join_node( const join_node &src );
input_ports_type &input_ports( );
bool register_successor( successor_type &r );
bool remove_successor( successor_type &r );
bool try_get( output_type &v );
bool try_reserve( output_type &v );
bool try_release( );
bool try_consume( );
};
//
// key_matching の特殊化
//
template<typename OutputTuple, typename K, class KHash=tbb_hash_compare<K> >
class join_node<OutputTuple, key_matching<K,KHash> > :
public graph_node, public sender< OutputTuple > {
public:
// 以前の join_node と同じメソッドで、
// key_matching 関数オブジェクトを指定する
// コンストラクターを持つ
template<typename B0, typename B1>
join_node( graph &g, B0 b0, B1 b1 );
// 3 ~ 10 要素に対するコンストラクターも
// 同様に定義される ...
};
}
}
| メンバー | 説明 |
|---|---|
| join_node( graph &g ) | g のルートタスクを使用してタスクをスポーンする join_node を構築します。 |
| template < typename B0, typename B1, ... > join_node( graph &g, B0 b0, B1 b1, ... ) |
join_node の key_matching の特殊化でのみ利用できるコンストラクター。 関数オブジェクト b0、b1、...、bN を使用して入力ポート 0 から N のタグを決定する join_node を作成します。g のルートタスクを使用してタスクをスポーンします。 注意join_node コンストラクターに渡す関数オブジェクトは例外をスローしてはなりません。これらは並列に呼び出されます。純粋で、最短時間で、非ブロックでなければなりません。 |
| join_node( const join_node &src ) |
src の構築時の状態と同じ初期状態で join_node を構築します。プレデセッサーのリスト、入力ポートのメッセージ、サクセサーはコピーされません。 |
| input_ports_type &input_ports( ) |
戻り値: レシーバーの flow::tuple。各要素は tbb::receiver<T> から継承されます。T はその入力で想定されるメッセージの型です。各タプル要素は、flow::receiver<T> と同じように使用できます。選択した join_node ポリシーに基づくポートの動作は上記の表に示されています。 |
| bool register_successor( successor_type &r ) |
サクセサーのセットに r を追加します。 戻り値: true。 |
| bool remove_successor( successor_type &r ) |
サクセサーのセットから r を削除します。 戻り値: true。 |
| bool try_get( output_type &v ) |
join_node のバッファーポリシーに基づいてタプルを生成しようとします。 戻り値: タプルの生成に成功した場合、生成したタプルを v にコピーして true を返します。その他の場合は false を返します。 |
| bool try_reserve( output_type &v ) |
予約をサポートしません。 戻り値: false。 |
| bool try_release( ) |
予約をサポートしません。 戻り値: false。 |
| bool try_consume( ) |
予約をサポートしません。 戻り値: false。 |
| template<size_t N, typename JNT> typename flow::tuple_element<N, typename JNT::input_ports_type>::type &input_port( JNT &jn ) |
input_port <N>( jn ) の呼び出しは std::get<N>( jn.input_ports() ) の呼び出しと等価です。 戻り値: join_node jn の N 番目の入力ポート。 |